nodebookls/file_processor.py
2025-10-29 13:56:24 +08:00

148 lines
6.0 KiB
Python

import os
import tempfile
import json
from typing import List, Dict, Tuple, Optional
from utils import get_file_hash, read_text_file, split_into_paragraphs, clean_text
from content_analyzer import analyzer
from exceptions import DocumentProcessingError
from security import security_manager
class TextFileProcessor:
def __init__(self, upload_folder: str = "uploads"):
self.upload_folder = upload_folder
if not os.path.exists(upload_folder):
os.makedirs(upload_folder)
# Create a cache file for file hashes
self.hash_cache_file = os.path.join(upload_folder, "hash_cache.json")
self.load_hash_cache()
def load_hash_cache(self):
"""Load file hash cache from disk"""
if os.path.exists(self.hash_cache_file):
with open(self.hash_cache_file, "r", encoding="utf-8") as f:
self.hash_cache = json.load(f)
else:
self.hash_cache = {}
def save_hash_cache(self):
"""Save file hash cache to disk"""
with open(self.hash_cache_file, "w", encoding="utf-8") as f:
json.dump(self.hash_cache, f, ensure_ascii=False, indent=2)
def is_file_processed(self, file_path: str) -> bool:
"""Check if file has been processed before"""
file_hash = get_file_hash(file_path)
return file_hash in self.hash_cache
def mark_file_as_processed(self, file_path: str, segments: List[Dict], analysis: Optional[Dict] = None):
"""Mark file as processed and cache its hash"""
file_hash = get_file_hash(file_path)
cache_entry = {
"file_name": os.path.basename(file_path),
"segments": segments
}
# 添加内容分析结果
if analysis:
cache_entry["analysis"] = analysis
self.hash_cache[file_hash] = cache_entry
self.save_hash_cache()
def get_cached_segments(self, file_path: str) -> List[Dict]:
"""Get cached segments for a processed file"""
file_hash = get_file_hash(file_path)
if file_hash in self.hash_cache:
return self.hash_cache[file_hash]["segments"]
return []
def get_cached_analysis(self, file_path: str) -> Dict:
"""Get cached analysis for a processed file"""
file_hash = get_file_hash(file_path)
if file_hash in self.hash_cache and "analysis" in self.hash_cache[file_hash]:
return self.hash_cache[file_hash]["analysis"]
return {}
def process_files(self, file_paths: List[str], progress_callback=None) -> Tuple[List[Dict], int, int]:
"""
Process uploaded text files with progress tracking
Returns: (processed_segments, empty_files_count, duplicate_files_count)
"""
processed_segments = []
empty_files_count = 0
duplicate_files_count = 0
total_files = len(file_paths)
for i, file_path in enumerate(file_paths):
# Call progress callback if provided
if progress_callback:
progress_callback(i, total_files, f"处理文件 {i+1}/{total_files}")
# 验证文件安全性
is_valid, error_msg = security_manager.validate_file(file_path)
if not is_valid:
print(f"文件验证失败 {file_path}: {error_msg}")
empty_files_count += 1
continue
# Check if file is empty
if os.path.getsize(file_path) == 0:
empty_files_count += 1
continue
# Check if file has been processed before (for incremental update)
if self.is_file_processed(file_path):
# Use cached segments
cached_segments = self.get_cached_segments(file_path)
processed_segments.extend(cached_segments)
duplicate_files_count += 1
continue
# Process file
try:
# Read file content
content = read_text_file(file_path)
# 清理文本内容
content = security_manager.sanitize_text(content)
# Clean text
cleaned_content = clean_text(content)
# Split into paragraphs
paragraphs = split_into_paragraphs(cleaned_content)
# Analyze content for keywords and summary
analyzer_instance = analyzer # Assuming analyzer is accessible
analysis = analyzer_instance.analyze_content(cleaned_content)
# Create segments with metadata
file_name = os.path.basename(file_path)
file_segments = []
for j, paragraph in enumerate(paragraphs):
if paragraph.strip(): # Only add non-empty paragraphs
# 清理段落内容
clean_paragraph = security_manager.sanitize_text(paragraph)
segment = {
"content": clean_paragraph,
"metadata": {
"file_name": file_name,
"paragraph_id": j + 1,
"segment_id": f"{file_name}_{j + 1}"
}
}
file_segments.append(segment)
processed_segments.append(segment)
# Cache processed file with analysis
self.mark_file_as_processed(file_path, file_segments, analysis)
except Exception as e:
error_msg = f"Error processing file {file_path}: {str(e)}"
print(error_msg)
# Log error but continue with other files
# Continue with other files even if one fails
return processed_segments, empty_files_count, duplicate_files_count