import os import tempfile import json from celery import current_task from celery_app import celery_app from di_container import container from config import settings @celery_app.task(bind=True) def process_files_task(self, file_contents: dict) -> dict: """ Celery task to process uploaded files asynchronously Args: self: Celery task instance file_contents: Dictionary with filename as key and content as value Returns: Dictionary with processing results """ temp_files = [] # Initialize temp_files list try: # Update task state self.update_state(state='PROGRESS', meta={'status': '初始化处理环境...'}) # Create temporary files from content for filename, content in file_contents.items(): with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as temp_file: temp_file.write(content.encode('utf-8') if isinstance(content, str) else content) temp_files.append(temp_file.name) # Initialize components through DI container file_processor = container.get('file_processor') vector_store = container.get('vector_store') # Process files with progress tracking def progress_callback(current, total, message): percent = int((current / total) * 100) self.update_state( state='PROGRESS', meta={ 'status': message, 'current': current, 'total': total, 'percent': percent } ) # Process files processed_segments, empty_count, duplicate_count = file_processor.process_files( temp_files, progress_callback ) # Update task state self.update_state(state='PROGRESS', meta={'status': '向量化处理中...'}) # Add to vector store if processed_segments: vector_store.add_documents(processed_segments) # Clean up temporary files for temp_file in temp_files: if os.path.exists(temp_file): os.unlink(temp_file) # Return results return { 'status': 'SUCCESS', 'processed_segments': len(processed_segments), 'empty_files_filtered': empty_count, 'duplicate_files_filtered': duplicate_count } except Exception as e: # Clean up temporary files in case of error for temp_file in temp_files: if os.path.exists(temp_file): os.unlink(temp_file) return { 'status': 'FAILURE', 'error': str(e) } @celery_app.task(bind=True) def generate_text_task(self, query: str, style: str = "通用文案", min_length: int = 50, max_length: int = 200) -> dict: """ Celery task to generate text based on query Args: self: Celery task instance query: Query string style: Writing style min_length: Minimum text length max_length: Maximum text length Returns: Dictionary with generation results """ try: # Update task state self.update_state(state='PROGRESS', meta={'status': '检索相关文档...'}) # Initialize components vector_store = VectorStore() # Search for relevant segments search_results = vector_store.search(query, settings.TOP_K) if not search_results: return { 'status': 'FAILURE', 'error': '未找到相关文档内容' } # Update task state self.update_state(state='PROGRESS', meta={'status': '生成文案中...'}) # Extract context from search results context_parts = [] for result, score in search_results: # In a full implementation, we would retrieve the actual content # For now, we'll just use the metadata context_parts.append(f"[{result['metadata']['segment_id']}] 相关内容") context = "\n".join(context_parts) # Generate text (this would normally use the generator module) # For now, we'll simulate generation generated_text = f"基于您的查询 '{query}' 和要求的 {style} 风格,我们生成了以下内容:\n\n" \ f"这是根据相关文档内容生成的示例文案。在完整实现中,这里将包含基于文档内容生成的实际文案。\n\n" \ f"文档相关片段:\n{context}" # Return results return { 'status': 'SUCCESS', 'query': query, 'generated_text': generated_text, 'hallucination_warnings': [] # In a full implementation, this would check for hallucinations } except Exception as e: return { 'status': 'FAILURE', 'error': str(e) }