import threading import queue import json from ai_studio import call_coze_article_workflow, call_coze_all_article_workflow from images_edit import download_and_process_images from utils import * from get_web_content import * from config import * from plagiarismdetecto import FinalPlagiarismDetector # ==============================主程序=========================== def process_link(link_info, ai_service, current_template=None, generation_type=None, app=None): # 确保 link_info 是元组或列表 if isinstance(link_info, (tuple, list)) and len(link_info) >= 2: link, article_type = link_info else: link = str(link_info) if not isinstance(link_info, str) else link_info article_type = generation_type or "未分类" # 初始化变量 original_config = None message_content = "" title = "" try: # 使用优化后的提取函数,支持重试和多重备用方案 if isinstance(link, str) and link.startswith("https://www.toutiao.com"): logger.info(f"正在提取头条文章内容: {link}") title_text, article_text, img_urls = extract_content_with_retry(link, max_retries=3) logger.info(f"提取完成,标题长度: {len(title_text)}, 内容长度: {len(article_text)}, 图片数: {len(img_urls)}") elif isinstance(link, str) and link.startswith("https://mp.weixin.qq.co"): logger.info(f"正在提取微信文章内容: {link}") title_text, article_text, img_urls = extract_content_with_retry(link, max_retries=3) logger.info(f"提取完成,标题长度: {len(title_text)}, 内容长度: {len(article_text)}, 图片数: {len(img_urls)}") elif isinstance(link, str) and link.startswith("https://www.163.com"): logger.info(f"正在提取网易文章内容: {link}") title_text, article_text, img_urls = extract_content_with_retry(link, max_retries=3) logger.info(f"提取完成,标题长度: {len(title_text)}, 内容长度: {len(article_text)}, 图片数: {len(img_urls)}") else: logger.warning(f"不支持的链接格式: {link}") title_text, article_text, img_urls = "", "", [] # 验证提取结果 if not title_text: logger.warning(f"文章标题为空,跳过处理: {link}") return elif len(title_text) > 100: logger.warning(f"文章标题过长,跳过处理: {link}") return # 检查文章字数是否低于最小阈值 min_article_length = int(CONFIG['General'].get('min_article_length', '100')) if len(article_text) < min_article_length: logger.warning(f"文章字数 {len(article_text)} 低于最小阈值 {min_article_length},跳过处理: {link}") return title = extract_content_until_punctuation(article_text).replace("正文:", "") from datetime import datetime current_time = datetime.now().strftime("%H:%M:%S") print("当前时间:", current_time) if ai_service == "coze": logger.info("coze正在处理") logger.info(f"正在处理的文章类型为:{generation_type}") if current_template: original_config = { 'workflow_id': CONFIG['Coze']['workflow_id'], 'access_token': CONFIG['Coze']['access_token'], 'is_async': CONFIG['Coze']['is_async'] } CONFIG['Coze']['workflow_id'] = current_template.get('workflow_id', '') CONFIG['Coze']['access_token'] = current_template.get('access_token', '') CONFIG['Coze']['is_async'] = current_template.get('is_async', 'true') logger.info(f"应用模板配置: {current_template.get('name')}") logger.info(f"Workflow ID: {CONFIG['Coze']['workflow_id']}") logger.info(f"Access Token: {'*' * len(CONFIG['Coze']['access_token'])}") logger.info(f"Is Async: {CONFIG['Coze']['is_async']}") try: input_data_template_str = CONFIG['Coze'].get('input_data_template') if not input_data_template_str: raise ValueError("input_data_template 配置缺失") input_data_template = json.loads(input_data_template_str) if generation_type == "短篇": input_data = {"article": article_text} print("coze中输入:", input_data) message_content = call_coze_article_workflow(input_data) elif generation_type == "文章": print("原文中标题为:", title_text) print("原文中内容为:", article_text) input_data = {"title": title_text, "article": article_text} print("发送的请求数据为:", input_data) try: result = call_coze_all_article_workflow(input_data) if isinstance(result, dict) and 'error' in result: raise Exception(result['error']) title, message_content = result except Exception as e: logger.error(f"调用 Coze 工作流时出错: {e}") raise finally: if original_config is not None: for key, value in original_config.items(): CONFIG['Coze'][key] = str(value) # 确保message_content是字符串类型 if not isinstance(message_content, str): logger.warning(f"message_content 类型异常: {type(message_content)}") message_content = str(message_content) if message_content else "" logger.info(f"message_content处理前类型: {type(message_content)}, 长度: {len(message_content)}") if message_content: logger.info(f"message_content前300字符: {message_content[:300]}") # 去除标题首尾的空格 title_text = title_text.strip() # 处理 AI 工作流返回的内容 if generation_type == "短篇": logger.info(f"message_content原始内容长度: {len(message_content) if message_content else 0}") logger.info(f"message_content前200字符: {message_content[:200] if message_content else '空'}") if isinstance(message_content, str) and "\n" in message_content: first_line, rest_content = message_content.split("\n", 1) logger.info(f"第一行内容: {first_line}") if len(first_line) < 100: title = first_line.strip().rstrip(',。!?;:') message_content = rest_content logger.info(f"使用第一行作为标题: {title}") else: title = extract_content_until_punctuation(message_content).replace("正文:", "") logger.info(f"第一行过长,使用extract提取: {title}") else: title = extract_content_until_punctuation(message_content).replace("正文:", "") logger.info(f"无换行,使用extract提取: {title}") logger.info(f"最终title: '{title}'") logger.info(f"title长度: {len(title)}") elif generation_type == "文章": pass # 创建类型目录 type_dir = os.path.join(ARTICLES_BASE_PATH, article_type) safe_open_directory(type_dir) # 在类型目录下保存文章 file_name = "" title_to_use = title.strip() if title else "" logger.info(f"用于文件名的title: '{title_to_use}', 长度: {len(title_to_use)}") if generation_type == '短篇': file_name = handle_duplicate_files_advanced(type_dir, title_to_use)[0] elif generation_type == "文章": file_name = handle_duplicate_files_advanced(type_dir, title_to_use)[0] logger.info(f"生成的file_name: '{file_name}'") article_save_path = os.path.join(type_dir, f"{file_name}.txt") if "```" in message_content: message_content = message_content.replace("```", "") message_content = str(title) + "\n" + str(message_content) # 收集文章信息 original_word_count = len(article_text) image_count = len(img_urls) rewritten_word_count = len(message_content) # 原创度检测(根据配置决定是否启用) enable_plagiarism_detection = CONFIG['General'].get('enable_plagiarism_detection', 'false').lower() == 'true' similarity_score = None if enable_plagiarism_detection: print("正在进行原创度检测") try: detector = FinalPlagiarismDetector() result = detector.analyze_similarity(article_text, message_content) similarity_score = result['overall_similarity'] print(f"原创度检测完成,相似度: {similarity_score:.2%}") except Exception as e: logger.error(f"原创度检测出错: {e}") similarity_score = None else: print("原创度检测已禁用,跳过检测") # 判断文章合规度(根据配置决定是否启用) enable_detection = CONFIG['Baidu'].get('enable_detection', 'false').lower() == 'true' save_violation_articles = CONFIG['Baidu'].get('save_violation_articles', 'true').lower() == 'true' is_violation = False if enable_detection: print("正在检测文章合规度") detection_result = text_detection(message_content) if detection_result == "合规": print("文章合规") is_violation = False else: print(f"文章不合规: {detection_result}") is_violation = True if not save_violation_articles: if app: display_title = title app.update_article_info_list(display_title, original_word_count, image_count, rewritten_word_count, True, similarity_score) return else: print("违规检测已禁用,跳过检测") is_violation = False # 如果是违规文章且需要保存,在文件名中添加"已违规"标识 if is_violation and save_violation_articles: name_part, ext_part = os.path.splitext(article_save_path) article_save_path = f"{name_part}_已违规{ext_part}" with open(article_save_path, 'w', encoding='utf-8') as f: f.write(message_content) logging.info('文本已经保存') # 更新GUI中的文章信息列表 if app: display_title = title app.update_article_info_list(display_title, original_word_count, image_count, rewritten_word_count, is_violation, similarity_score) if img_urls: type_picture_dir = os.path.join(IMGS_BASE_PATH, article_type) safe_open_directory(type_picture_dir) download_and_process_images(img_urls, file_name.strip(), type_picture_dir) except Exception as e: logging.error(f"处理链接 {link} 时出错: {e}") raise def link_to_text(num_threads=None, ai_service="coze", current_template=None, generation_type=None, app=None): use_link_path = 'use_link_path.txt' # 读取链接 links = read_excel(TITLE_BASE_PATH) filtered_links = [] for link_info in links: link = link_info[0].strip() article_type = link_info[1].strip() if len(link_info) > 1 and link_info[1].strip() else generation_type logging.info(f"总共{len(links)}个链接") filtered_links.append((link, article_type)) if not filtered_links: logger.info("没有新链接需要处理") return [] # 使用多线程处理链接 results = process_links_with_threads(filtered_links, num_threads, ai_service, current_template, generation_type, app) # 记录已处理的链接 with open(use_link_path, 'a+', encoding='utf-8') as f: for result in results: if isinstance(result, tuple) and len(result) >= 2: link_info, success, _ = result if isinstance(link_info, (tuple, list)): link = link_info[0] else: link = link_info if success: f.write(str(link) + "\n") else: logger.warning(f"意外的结果格式: {result}") return results # 创建一个任务队列和结果队列 task_queue = queue.Queue() result_queue = queue.Queue() # 全局暂停事件 pause_event = threading.Event() pause_event.set() # 工作线程函数 def worker(ai_service, current_template=None, generation_type=None, app=None): while True: try: pause_event.wait() link_info = task_queue.get() if link_info is None: break link, article_type = link_info try: logger.info(f"开始处理链接:{link}") process_link((link, article_type), ai_service, current_template, generation_type, app) result_queue.put(((link, article_type), True, None)) except Exception as e: result_queue.put(((link, article_type), False, str(e))) logger.error(f"处理链接 {link} 时出错: {e}") task_queue.task_done() pause_event.wait() except Exception as e: logger.error(f"工作线程出错: {e}") # 多线程处理链接 def process_links_with_threads(links, num_threads=None, ai_service="coze", current_template=None, generation_type=None, app=None): if num_threads is None: num_threads = min(MAX_THREADS, len(links)) else: num_threads = min(num_threads, MAX_THREADS, len(links)) while not task_queue.empty(): task_queue.get() while not result_queue.empty(): result_queue.get() threads = [] for _ in range(num_threads): t = threading.Thread(target=worker, args=(ai_service, current_template, generation_type, app)) t.daemon = True t.start() threads.append(t) for link in links: task_queue.put(link) for _ in range(num_threads): task_queue.put(None) for t in threads: t.join() results = [] while not result_queue.empty(): results.append(result_queue.get()) return results