nodebookls/tasks.py
2025-10-29 13:56:24 +08:00

147 lines
5.0 KiB
Python

import os
import tempfile
import json
from celery import current_task
from celery_app import celery_app
from di_container import container
from config import settings
@celery_app.task(bind=True)
def process_files_task(self, file_contents: dict) -> dict:
"""
Celery task to process uploaded files asynchronously
Args:
self: Celery task instance
file_contents: Dictionary with filename as key and content as value
Returns:
Dictionary with processing results
"""
temp_files = [] # Initialize temp_files list
try:
# Update task state
self.update_state(state='PROGRESS', meta={'status': '初始化处理环境...'})
# Create temporary files from content
for filename, content in file_contents.items():
with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as temp_file:
temp_file.write(content.encode('utf-8') if isinstance(content, str) else content)
temp_files.append(temp_file.name)
# Initialize components through DI container
file_processor = container.get('file_processor')
vector_store = container.get('vector_store')
# Process files with progress tracking
def progress_callback(current, total, message):
percent = int((current / total) * 100)
self.update_state(
state='PROGRESS',
meta={
'status': message,
'current': current,
'total': total,
'percent': percent
}
)
# Process files
processed_segments, empty_count, duplicate_count = file_processor.process_files(
temp_files, progress_callback
)
# Update task state
self.update_state(state='PROGRESS', meta={'status': '向量化处理中...'})
# Add to vector store
if processed_segments:
vector_store.add_documents(processed_segments)
# Clean up temporary files
for temp_file in temp_files:
if os.path.exists(temp_file):
os.unlink(temp_file)
# Return results
return {
'status': 'SUCCESS',
'processed_segments': len(processed_segments),
'empty_files_filtered': empty_count,
'duplicate_files_filtered': duplicate_count
}
except Exception as e:
# Clean up temporary files in case of error
for temp_file in temp_files:
if os.path.exists(temp_file):
os.unlink(temp_file)
return {
'status': 'FAILURE',
'error': str(e)
}
@celery_app.task(bind=True)
def generate_text_task(self, query: str, style: str = "通用文案",
min_length: int = 50, max_length: int = 200) -> dict:
"""
Celery task to generate text based on query
Args:
self: Celery task instance
query: Query string
style: Writing style
min_length: Minimum text length
max_length: Maximum text length
Returns:
Dictionary with generation results
"""
try:
# Update task state
self.update_state(state='PROGRESS', meta={'status': '检索相关文档...'})
# Initialize components
vector_store = VectorStore()
# Search for relevant segments
search_results = vector_store.search(query, settings.TOP_K)
if not search_results:
return {
'status': 'FAILURE',
'error': '未找到相关文档内容'
}
# Update task state
self.update_state(state='PROGRESS', meta={'status': '生成文案中...'})
# Extract context from search results
context_parts = []
for result, score in search_results:
# In a full implementation, we would retrieve the actual content
# For now, we'll just use the metadata
context_parts.append(f"[{result['metadata']['segment_id']}] 相关内容")
context = "\n".join(context_parts)
# Generate text (this would normally use the generator module)
# For now, we'll simulate generation
generated_text = f"基于您的查询 '{query}' 和要求的 {style} 风格,我们生成了以下内容:\n\n" \
f"这是根据相关文档内容生成的示例文案。在完整实现中,这里将包含基于文档内容生成的实际文案。\n\n" \
f"文档相关片段:\n{context}"
# Return results
return {
'status': 'SUCCESS',
'query': query,
'generated_text': generated_text,
'hallucination_warnings': [] # In a full implementation, this would check for hallucinations
}
except Exception as e:
return {
'status': 'FAILURE',
'error': str(e)
}