diff --git a/ArticleReplaceBatch/ai_studio.py b/ArticleReplaceBatch/ai_studio.py index 913c285..2083bb7 100644 --- a/ArticleReplaceBatch/ai_studio.py +++ b/ArticleReplaceBatch/ai_studio.py @@ -13,6 +13,7 @@ def call_dify_workflow(input_data): :param input_data: 传递给工作流的输入数据 :return: 工作流的输出结果 """ + logger.info("Dify开始工作。。。") api_key = CONFIG['Dify']['api_key'] user_id = CONFIG['Dify']['user_id'] url = CONFIG['Dify']['url'] @@ -40,17 +41,18 @@ def call_dify_workflow(input_data): # ==========================调用coze工作流========================== -def call_coze_workflow(workflow_id, access_token, parameters,is_async=False): +def call_coze_workflow(parameters): """ 调用 Coze 工作流的函数 - :param workflow_id: Coze 工作流 ID - :param access_token: 个人访问令牌(Access Token) :param parameters: 传递给工作流的输入参数(字典格式) - :param app_id: 应用 ID(可选) - :param is_async: 是否异步执行(默认 False) :return: 工作流的执行结果 """ + logger.info("Coze开始工作。。。。") + workflow_id = CONFIG['Coze']['workflow_id'] + access_token = CONFIG['Coze']['access_token'] + is_async = CONFIG['Coze']['is_async'].lower() == 'true' + url = "https://api.coze.cn/v1/workflow/run" headers = { "Authorization": f"Bearer {access_token}", diff --git a/ArticleReplaceBatch/config.py b/ArticleReplaceBatch/config.py index 663fe05..ae2c486 100644 --- a/ArticleReplaceBatch/config.py +++ b/ArticleReplaceBatch/config.py @@ -15,6 +15,11 @@ DEFAULT_CONFIG = { "title_file": "文章链接.xlsx", "max_threads": "3" }, + "Coze": { + "workflow_id": "", + "access_token": "", + "is_async": "false" + }, "Database": { "host": "27.106.125.150", "user": "root", diff --git a/ArticleReplaceBatch/main_process.py b/ArticleReplaceBatch/main_process.py index d800280..3228d48 100644 --- a/ArticleReplaceBatch/main_process.py +++ b/ArticleReplaceBatch/main_process.py @@ -1,19 +1,18 @@ - import threading import queue - +import json # 导入 json 模块 from ai_studio import call_dify_workflow, call_coze_workflow from databases import * - from images_edit import download_and_process_images from utils import * -from get_web_content import * +from get_web_content import * from config import * + # ==============================主程序=========================== -def process_link(link, ai_service="dify"): +def process_link(link, ai_service): """ 处理单个链接 :param link: 要处理的链接 @@ -31,8 +30,10 @@ def process_link(link, ai_service="dify"): if title_text == "": return + elif len(title_text) > 100: + return - # 获取数据库配置 + # 获取数据库配置 host = CONFIG['Database']['host'] user = CONFIG['Database']['user'] password = CONFIG['Database']['password'] @@ -41,11 +42,6 @@ def process_link(link, ai_service="dify"): # 判断文章内容是否有违禁词 check_keywords = check_keywords_in_text(title_text) - if check_keywords: - print("文章中有违禁词!") - check_link_insert(host, user, password, database, link) - return - title = extract_content_until_punctuation(article_text).replace("正文:", "") print(title) @@ -59,20 +55,55 @@ def process_link(link, ai_service="dify"): # 打印当前时间 print("当前时间:", current_time) - input_data = { - "old_article": article_text - } - if ai_service == "dify": - input_data = { - "old_article": article_text - } + if check_keywords: + print("文章中有违禁词!") + check_link_insert(host, user, password, database, link) + return + # 从配置加载 input_data 模板 + input_data_template_str = CONFIG['Dify'].get('input_data_template', '{{"old_article": "{article_text}"}}') + try: + # 解析模板字符串为字典 + input_data_template = json.loads(input_data_template_str) + # 使用实际变量格式化模板 + input_data = {k: v.format(article_text=article_text) for k, v in input_data_template.items()} + except (json.JSONDecodeError, KeyError, AttributeError) as e: + logger.error(f"处理 Dify input_data 模板时出错: {e}. 使用默认模板.") + input_data = { + "old_article": article_text + } + + # input_data = { + # "old_article": article_text + # } message_content = call_dify_workflow(input_data) elif ai_service == "coze": - input_data = { - "old_article": article_text - } - message_content = call_coze_workflow(input_data) + logger.info("coze正在处理") + weijin = "" + if check_keywords: + weijin = "违禁" + # 从配置加载 Coze input_data 模板 + input_data_template_str = CONFIG['Coze'].get('input_data_template', + '{{"article": "{article_text}", "link":"{link}", "weijin":"{weijin}"}}') + try: + # 解析模板字符串为字典 + input_data_template = json.loads(input_data_template_str) + # 使用实际变量格式化模板 + input_data = {k: v.format(article_text=article_text, link=link, weijin=weijin) for k, v in + input_data_template.items()} + except (json.JSONDecodeError, KeyError, AttributeError) as e: + logger.error(f"处理 Coze input_data 模板时出错: {e}. 使用默认模板.") + input_data = { + "article": article_text, + "link": link, + "weijin": weijin + } + + msg = call_coze_workflow(input_data) + message_content = msg['article'] + result = msg['result'] + if result == "已经创作过": + return # 获取当前时间并格式化 current_time = datetime.now().strftime("%H:%M:%S") @@ -117,7 +148,6 @@ def process_link(link, ai_service="dify"): raise - def link_to_text(prompt1=None, prompt2=None, num_threads=None, ai_service="dify"): use_link_path = 'use_link_path.txt' @@ -139,14 +169,14 @@ def link_to_text(prompt1=None, prompt2=None, num_threads=None, ai_service="dify" else: filtered_links.append(link) logger.info(f"链接不存在: {link}") - print("链接不存在,存储到过滤器中:",link) + print("链接不存在,存储到过滤器中:", link) if not filtered_links: logger.info("没有新链接需要处理") return [] # 使用多线程处理链接 - results = process_links_with_threads(filtered_links, num_threads) + results = process_links_with_threads(filtered_links, num_threads, ai_service) # 记录已处理的链接 with open(use_link_path, 'a+', encoding='utf-8') as f: @@ -163,7 +193,7 @@ result_queue = queue.Queue() # 工作线程函数 -def worker(): +def worker(ai_service): while True: try: # 从队列中获取任务 @@ -187,12 +217,12 @@ def worker(): # 多线程处理链接 -def process_links_with_threads(links, num_threads=None): +def process_links_with_threads(links, num_threads=None, ai_service="dify"): if num_threads is None: num_threads = min(MAX_THREADS, len(links)) else: num_threads = min(num_threads, MAX_THREADS, len(links)) - + # 清空任务队列和结果队列 while not task_queue.empty(): task_queue.get() @@ -201,8 +231,10 @@ def process_links_with_threads(links, num_threads=None): # 创建工作线程 threads = [] + + # 将AI服务选择传递给worker函数 for _ in range(num_threads): - t = threading.Thread(target=worker) + t = threading.Thread(target=worker, args=(ai_service,)) t.daemon = True t.start() threads.append(t)