ArticleReplaceBatch/ArticleReplaceBatch/main_process_wtt.py

297 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import threading
import queue
import json # 导入 json 模块
from ai_studio import call_dify_workflow, call_coze_workflow,call_coze_article_workflow
from databases import *
from images_edit import download_and_process_images
from utils import *
from get_web_content import *
from config import *
# ==============================主程序===========================
def process_link(link_info, ai_service, current_template=None):
link, article_type = link_info # 解包链接和类型信息
"""
处理单个链接
:param link: 要处理的链接
:param ai_service: AI服务提供商可选值dify, coze
:param current_template: 当前选择的模板配置
"""
try:
if link.startswith("https://www.toutiao.com"):
title_text, article_text, img_urls = toutiao_w_extract_content(link)
if title_text == "":
title_text, article_text, img_urls = toutiao_extract_content(link)
elif link.startswith("https://mp.weixin.qq.co"):
title_text, article_text, img_urls = wechat_extract_content(link)
elif link.startswith("https://www.163.com"):
title_text, article_text, img_urls = wangyi_extract_content(link)
else:
title_text, article_text, img_urls = "", "", []
if title_text == "":
return
elif len(title_text) > 100:
return
# 获取数据库配置
host = CONFIG['Database']['host']
user = CONFIG['Database']['user']
password = CONFIG['Database']['password']
database = CONFIG['Database']['database']
# 判断文章内容是否有违禁词
check_keywords = check_keywords_in_text(title_text)
title = extract_content_until_punctuation(article_text).replace("正文:", "")
print(img_urls)
print(article_text)
from datetime import datetime
# 获取当前时间并格式化
current_time = datetime.now().strftime("%H:%M:%S")
# 打印当前时间
print("当前时间:", current_time)
if ai_service == "dify":
if check_keywords:
print("文章中有违禁词!")
check_link_insert(host, user, password, database, link)
return
# 从配置加载 input_data 模板
input_data_template_str = CONFIG['Dify'].get('input_data_template', '{{"old_article": "{article_text}"}}')
try:
# 解析模板字符串为字典
input_data_template = json.loads(input_data_template_str)
# 使用实际变量格式化模板
input_data = {k: v.format(article_text=article_text) for k, v in input_data_template.items()}
except (json.JSONDecodeError, KeyError, AttributeError) as e:
logger.error(f"处理 Dify input_data 模板时出错: {e}. 使用默认模板.")
input_data = {
"old_article": article_text
}
# input_data = {
# "old_article": article_text
# }
message_content = call_dify_workflow(input_data)
elif ai_service == "coze":
logger.info("coze正在处理")
# 如果有模板配置临时更新CONFIG
original_config = None
if current_template:
original_config = {
'workflow_id': CONFIG['Coze']['workflow_id'],
'access_token': CONFIG['Coze']['access_token'],
'is_async': CONFIG['Coze']['is_async'],
'input_data_template': CONFIG['Coze'].get('input_data_template', '')
}
CONFIG['Coze']['workflow_id'] = current_template.get('workflow_id', '')
CONFIG['Coze']['access_token'] = current_template.get('access_token', '')
CONFIG['Coze']['is_async'] = current_template.get('is_async', 'true')
CONFIG['Coze']['input_data_template'] = current_template.get('input_data_template', '')
logger.info(f"应用模板配置: {current_template.get('name')}")
logger.info(f"Workflow ID: {CONFIG['Coze']['workflow_id']}")
logger.info(f"Access Token: {'*' * len(CONFIG['Coze']['access_token'])}")
logger.info(f"Is Async: {CONFIG['Coze']['is_async']}")
logger.info(f"Input Template: {CONFIG['Coze']['input_data_template']}")
try:
# 从配置加载 Coze input_data 模板
input_data_template_str = CONFIG['Coze'].get('input_data_template',
'{"article": "{article_text}", "link":"{link}", "weijin":"{weijin}"}')
# 解析模板字符串为字典
input_data_template = json.loads(input_data_template_str)
# 使用实际变量格式化模板
input_data = input_data_template
except (json.JSONDecodeError, KeyError, AttributeError) as e:
logger.error(f"处理 Coze input_data 模板时出错: {e}. 使用默认模板.")
input_data = {
"article": article_text
}
try:
message_content = call_coze_article_workflow(input_data)
finally:
# 恢复原始配置(如果有的话)
if original_config is not None:
CONFIG['Coze']['workflow_id'] = original_config['workflow_id']
CONFIG['Coze']['access_token'] = original_config['access_token']
CONFIG['Coze']['is_async'] = original_config['is_async']
CONFIG['Coze']['input_data_template'] = original_config['input_data_template']
# 获取当前时间并格式化
current_time = datetime.now().strftime("%H:%M:%S")
# 打印当前时间
print("当前时间:", current_time)
file_name = handle_duplicate_files_advanced(ARTICLES_BASE_PATH,title_text)[0]
# 创建类型目录
type_dir = os.path.join(ARTICLES_BASE_PATH, article_type)
safe_open_directory(type_dir)
# 在类型目录下保存文章
article_save_path = os.path.join(type_dir, f"{file_name}.txt")
if '*' in message_content or '#' in message_content or "-" in message_content:
# 使用正则表达式一次性替换多个字符
old_content = re.sub(r'[*#-]', '', message_content)
else:
# 如果不需要替换,直接使用原内容
old_content = message_content
print("改写完成的文章:" + old_content)
# 删除AI词汇
content = old_content
# 判断文章合规度
if text_detection(content) == "合规":
print("文章合规")
pass
else:
print("文章不合规")
return
with open(article_save_path, 'w', encoding='utf-8') as f:
f.write(content)
logging.info('文本已经保存')
if img_urls:
# 在类型目录下创建图片目录
type_picture_dir = os.path.join(IMGS_BASE_PATH, article_type)
safe_open_directory(type_picture_dir)
download_and_process_images(img_urls, file_name, type_picture_dir)
except Exception as e:
logging.error(f"处理链接 {link} 时出错: {e}")
raise
def link_to_text(num_threads=None, ai_service="dify", current_template=None):
use_link_path = 'use_link_path.txt'
# 读取链接
links = read_excel(TITLE_BASE_PATH)
# 过滤已处理的链接
filtered_links = []
host = CONFIG['Database']['host']
user = CONFIG['Database']['user']
password = CONFIG['Database']['password']
database = CONFIG['Database']['database']
for link_info in links:
link = link_info[0].strip() # 获取链接并去除空白字符
article_type = link_info[1].strip() # 获取类型并去除空白字符
logging.info(f"总共{len(links)}个链接")
# if check_link_exists(host, user, password, database, link):
# logger.info(f"链接已存在: {link}")
# continue
# else:
filtered_links.append((link, article_type)) # 保存链接和类型的元组
# logger.info(f"链接不存在: {link}")
# print("链接不存在,存储到过滤器中:", link)
if not filtered_links:
logger.info("没有新链接需要处理")
return []
# 使用多线程处理链接
results = process_links_with_threads(filtered_links, num_threads, ai_service, current_template)
# 记录已处理的链接
with open(use_link_path, 'a+', encoding='utf-8') as f:
for link, success, _ in results:
if success:
f.write(link + "\n")
return results
# 创建一个任务队列和结果队列
task_queue = queue.Queue()
result_queue = queue.Queue()
# 工作线程函数
def worker(ai_service, current_template=None):
while True:
try:
# 从队列中获取任务
link = task_queue.get()
if link is None: # 结束信号
break
# 处理链接
try:
logger.info(f"开始处理链接:{link}")
process_link(link, ai_service, current_template)
result_queue.put((link, True, None)) # 成功
except Exception as e:
result_queue.put((link, False, str(e))) # 失败
logger.error(f"处理链接 {link} 时出错: {e}")
# 标记任务完成
task_queue.task_done()
except Exception as e:
logger.error(f"工作线程出错: {e}")
# 多线程处理链接
def process_links_with_threads(links, num_threads=None, ai_service="dify", current_template=None):
if num_threads is None:
num_threads = min(MAX_THREADS, len(links))
else:
num_threads = min(num_threads, MAX_THREADS, len(links))
# 清空任务队列和结果队列
while not task_queue.empty():
task_queue.get()
while not result_queue.empty():
result_queue.get()
# 创建工作线程
threads = []
# 将AI服务选择和模板配置传递给worker函数
for _ in range(num_threads):
t = threading.Thread(target=worker, args=(ai_service, current_template))
t.daemon = True
t.start()
threads.append(t)
# 添加任务到队列
for link in links:
task_queue.put(link)
# 添加结束信号
for _ in range(num_threads):
task_queue.put(None)
# 等待所有线程完成
for t in threads:
t.join()
# 处理结果
results = []
while not result_queue.empty():
results.append(result_queue.get())
return results