Files
ArticleReplaceBatch/main_process.py

350 lines
14 KiB
Python
Raw Permalink Normal View History

2026-03-25 15:17:18 +08:00
import threading
import queue
import json
from ai_studio import call_coze_article_workflow, call_coze_all_article_workflow
from images_edit import download_and_process_images
from utils import *
from get_web_content import *
from config import *
from plagiarismdetecto import FinalPlagiarismDetector
# ==============================主程序===========================
def process_link(link_info, ai_service, current_template=None, generation_type=None, app=None):
# 确保 link_info 是元组或列表
if isinstance(link_info, (tuple, list)) and len(link_info) >= 2:
link, article_type = link_info
else:
link = str(link_info) if not isinstance(link_info, str) else link_info
article_type = generation_type or "未分类"
# 初始化变量
original_config = None
message_content = ""
title = ""
try:
# 使用优化后的提取函数,支持重试和多重备用方案
if isinstance(link, str) and link.startswith("https://www.toutiao.com"):
logger.info(f"正在提取头条文章内容: {link}")
title_text, article_text, img_urls = extract_content_with_retry(link, max_retries=3)
logger.info(f"提取完成,标题长度: {len(title_text)}, 内容长度: {len(article_text)}, 图片数: {len(img_urls)}")
elif isinstance(link, str) and link.startswith("https://mp.weixin.qq.co"):
logger.info(f"正在提取微信文章内容: {link}")
title_text, article_text, img_urls = extract_content_with_retry(link, max_retries=3)
logger.info(f"提取完成,标题长度: {len(title_text)}, 内容长度: {len(article_text)}, 图片数: {len(img_urls)}")
elif isinstance(link, str) and link.startswith("https://www.163.com"):
logger.info(f"正在提取网易文章内容: {link}")
title_text, article_text, img_urls = extract_content_with_retry(link, max_retries=3)
logger.info(f"提取完成,标题长度: {len(title_text)}, 内容长度: {len(article_text)}, 图片数: {len(img_urls)}")
else:
logger.warning(f"不支持的链接格式: {link}")
title_text, article_text, img_urls = "", "", []
# 验证提取结果
if not title_text:
logger.warning(f"文章标题为空,跳过处理: {link}")
return
elif len(title_text) > 100:
logger.warning(f"文章标题过长,跳过处理: {link}")
return
# 检查文章字数是否低于最小阈值
min_article_length = int(CONFIG['General'].get('min_article_length', '100'))
if len(article_text) < min_article_length:
logger.warning(f"文章字数 {len(article_text)} 低于最小阈值 {min_article_length},跳过处理: {link}")
return
title = extract_content_until_punctuation(article_text).replace("正文:", "")
from datetime import datetime
current_time = datetime.now().strftime("%H:%M:%S")
print("当前时间:", current_time)
if ai_service == "coze":
logger.info("coze正在处理")
logger.info(f"正在处理的文章类型为:{generation_type}")
if current_template:
original_config = {
'workflow_id': CONFIG['Coze']['workflow_id'],
'access_token': CONFIG['Coze']['access_token'],
'is_async': CONFIG['Coze']['is_async']
}
CONFIG['Coze']['workflow_id'] = current_template.get('workflow_id', '')
CONFIG['Coze']['access_token'] = current_template.get('access_token', '')
CONFIG['Coze']['is_async'] = current_template.get('is_async', 'true')
logger.info(f"应用模板配置: {current_template.get('name')}")
logger.info(f"Workflow ID: {CONFIG['Coze']['workflow_id']}")
logger.info(f"Access Token: {'*' * len(CONFIG['Coze']['access_token'])}")
logger.info(f"Is Async: {CONFIG['Coze']['is_async']}")
try:
input_data_template_str = CONFIG['Coze'].get('input_data_template')
if not input_data_template_str:
raise ValueError("input_data_template 配置缺失")
input_data_template = json.loads(input_data_template_str)
if generation_type == "短篇":
input_data = {"article": article_text}
print("coze中输入", input_data)
message_content = call_coze_article_workflow(input_data)
elif generation_type == "文章":
print("原文中标题为:", title_text)
print("原文中内容为:", article_text)
input_data = {"title": title_text, "article": article_text}
print("发送的请求数据为:", input_data)
try:
result = call_coze_all_article_workflow(input_data)
if isinstance(result, dict) and 'error' in result:
raise Exception(result['error'])
title, message_content = result
except Exception as e:
logger.error(f"调用 Coze 工作流时出错: {e}")
raise
finally:
if original_config is not None:
for key, value in original_config.items():
CONFIG['Coze'][key] = str(value)
# 确保message_content是字符串类型
if not isinstance(message_content, str):
logger.warning(f"message_content 类型异常: {type(message_content)}")
message_content = str(message_content) if message_content else ""
logger.info(f"message_content处理前类型: {type(message_content)}, 长度: {len(message_content)}")
if message_content:
logger.info(f"message_content前300字符: {message_content[:300]}")
# 去除标题首尾的空格
title_text = title_text.strip()
# 处理 AI 工作流返回的内容
if generation_type == "短篇":
logger.info(f"message_content原始内容长度: {len(message_content) if message_content else 0}")
logger.info(f"message_content前200字符: {message_content[:200] if message_content else ''}")
if isinstance(message_content, str) and "\n" in message_content:
first_line, rest_content = message_content.split("\n", 1)
logger.info(f"第一行内容: {first_line}")
if len(first_line) < 100:
title = first_line.strip().rstrip(',。!?;:')
message_content = rest_content
logger.info(f"使用第一行作为标题: {title}")
else:
title = extract_content_until_punctuation(message_content).replace("正文:", "")
logger.info(f"第一行过长使用extract提取: {title}")
else:
title = extract_content_until_punctuation(message_content).replace("正文:", "")
logger.info(f"无换行使用extract提取: {title}")
logger.info(f"最终title: '{title}'")
logger.info(f"title长度: {len(title)}")
elif generation_type == "文章":
pass
# 创建类型目录
type_dir = os.path.join(ARTICLES_BASE_PATH, article_type)
safe_open_directory(type_dir)
# 在类型目录下保存文章
file_name = ""
title_to_use = title.strip() if title else ""
logger.info(f"用于文件名的title: '{title_to_use}', 长度: {len(title_to_use)}")
if generation_type == '短篇':
file_name = handle_duplicate_files_advanced(type_dir, title_to_use)[0]
elif generation_type == "文章":
file_name = handle_duplicate_files_advanced(type_dir, title_to_use)[0]
logger.info(f"生成的file_name: '{file_name}'")
article_save_path = os.path.join(type_dir, f"{file_name}.txt")
if "```" in message_content:
message_content = message_content.replace("```", "")
message_content = str(title) + "\n" + str(message_content)
# 收集文章信息
original_word_count = len(article_text)
image_count = len(img_urls)
rewritten_word_count = len(message_content)
# 原创度检测(根据配置决定是否启用)
enable_plagiarism_detection = CONFIG['General'].get('enable_plagiarism_detection', 'false').lower() == 'true'
similarity_score = None
if enable_plagiarism_detection:
print("正在进行原创度检测")
try:
detector = FinalPlagiarismDetector()
result = detector.analyze_similarity(article_text, message_content)
similarity_score = result['overall_similarity']
print(f"原创度检测完成,相似度: {similarity_score:.2%}")
except Exception as e:
logger.error(f"原创度检测出错: {e}")
similarity_score = None
else:
print("原创度检测已禁用,跳过检测")
# 判断文章合规度(根据配置决定是否启用)
enable_detection = CONFIG['Baidu'].get('enable_detection', 'false').lower() == 'true'
save_violation_articles = CONFIG['Baidu'].get('save_violation_articles', 'true').lower() == 'true'
is_violation = False
if enable_detection:
print("正在检测文章合规度")
detection_result = text_detection(message_content)
if detection_result == "合规":
print("文章合规")
is_violation = False
else:
print(f"文章不合规: {detection_result}")
is_violation = True
if not save_violation_articles:
if app:
display_title = title
app.update_article_info_list(display_title, original_word_count, image_count, rewritten_word_count, True, similarity_score)
return
else:
print("违规检测已禁用,跳过检测")
is_violation = False
# 如果是违规文章且需要保存,在文件名中添加"已违规"标识
if is_violation and save_violation_articles:
name_part, ext_part = os.path.splitext(article_save_path)
article_save_path = f"{name_part}_已违规{ext_part}"
with open(article_save_path, 'w', encoding='utf-8') as f:
f.write(message_content)
logging.info('文本已经保存')
# 更新GUI中的文章信息列表
if app:
display_title = title
app.update_article_info_list(display_title, original_word_count, image_count, rewritten_word_count, is_violation, similarity_score)
if img_urls:
type_picture_dir = os.path.join(IMGS_BASE_PATH, article_type)
safe_open_directory(type_picture_dir)
download_and_process_images(img_urls, file_name.strip(), type_picture_dir)
except Exception as e:
logging.error(f"处理链接 {link} 时出错: {e}")
raise
def link_to_text(num_threads=None, ai_service="coze", current_template=None, generation_type=None, app=None):
use_link_path = 'use_link_path.txt'
# 读取链接
links = read_excel(TITLE_BASE_PATH)
filtered_links = []
for link_info in links:
link = link_info[0].strip()
article_type = link_info[1].strip() if len(link_info) > 1 and link_info[1].strip() else generation_type
logging.info(f"总共{len(links)}个链接")
filtered_links.append((link, article_type))
if not filtered_links:
logger.info("没有新链接需要处理")
return []
# 使用多线程处理链接
results = process_links_with_threads(filtered_links, num_threads, ai_service, current_template, generation_type, app)
# 记录已处理的链接
with open(use_link_path, 'a+', encoding='utf-8') as f:
for result in results:
if isinstance(result, tuple) and len(result) >= 2:
link_info, success, _ = result
if isinstance(link_info, (tuple, list)):
link = link_info[0]
else:
link = link_info
if success:
f.write(str(link) + "\n")
else:
logger.warning(f"意外的结果格式: {result}")
return results
# 创建一个任务队列和结果队列
task_queue = queue.Queue()
result_queue = queue.Queue()
# 全局暂停事件
pause_event = threading.Event()
pause_event.set()
# 工作线程函数
def worker(ai_service, current_template=None, generation_type=None, app=None):
while True:
try:
pause_event.wait()
link_info = task_queue.get()
if link_info is None:
break
link, article_type = link_info
try:
logger.info(f"开始处理链接:{link}")
process_link((link, article_type), ai_service, current_template, generation_type, app)
result_queue.put(((link, article_type), True, None))
except Exception as e:
result_queue.put(((link, article_type), False, str(e)))
logger.error(f"处理链接 {link} 时出错: {e}")
task_queue.task_done()
pause_event.wait()
except Exception as e:
logger.error(f"工作线程出错: {e}")
# 多线程处理链接
def process_links_with_threads(links, num_threads=None, ai_service="coze", current_template=None, generation_type=None, app=None):
if num_threads is None:
num_threads = min(MAX_THREADS, len(links))
else:
num_threads = min(num_threads, MAX_THREADS, len(links))
while not task_queue.empty():
task_queue.get()
while not result_queue.empty():
result_queue.get()
threads = []
for _ in range(num_threads):
t = threading.Thread(target=worker, args=(ai_service, current_template, generation_type, app))
t.daemon = True
t.start()
threads.append(t)
for link in links:
task_queue.put(link)
for _ in range(num_threads):
task_queue.put(None)
for t in threads:
t.join()
results = []
while not result_queue.empty():
results.append(result_queue.get())
return results