import json import requests from config import * # ==========================调用coze工作流========================== def call_coze_workflow(parameters): """ 调用 Coze 工作流的函数 :param parameters: 传递给工作流的输入参数(字典格式) :return: 工作流的执行结果 """ logger.info("Coze开始工作。。。。") workflow_id = CONFIG['Coze']['workflow_id'] access_token = CONFIG['Coze']['access_token'] is_async = CONFIG['Coze']['is_async'].lower() == 'true' url = "https://api.coze.cn/v1/workflow/run" headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } data = { "workflow_id": workflow_id, "parameters": parameters, "is_async": is_async } response = requests.post(url, json=data, headers=headers) if response.status_code == 200: # data = json.loads(response.text)['data'] # print("data:",data['output']) return response.text else: return { "error": f"请求失败,状态码:{response.status_code}", "detail": response.text } def call_coze_article_workflow(parameters): """ 调用 Coze 工作流的函数 :param parameters: 传递给工作流的输入参数(字典格式) :return: 工作流的执行结果 """ workflow_id = CONFIG['Coze']['workflow_id'] access_token = CONFIG['Coze']['access_token'] is_async = CONFIG['Coze']['is_async'].lower() == 'true' url = "https://api.coze.cn/v1/workflow/run" headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } data = { "workflow_id": workflow_id, "parameters": parameters, "is_async": is_async } response = requests.post(url, json=data, headers=headers) if response.status_code == 200: try: # 解析响应 result_dict = json.loads(response.text) print(result_dict) # 获取data字段 data_str = result_dict['data'] # 如果data是字符串,尝试解析它 if isinstance(data_str, str): data_dict = json.loads(data_str) else: data_dict = data_str # 获取output的值 output_value = data_dict.get('article', '') return output_value except Exception as e: return { "error": f"解析响应时出错:{str(e)}", "detail": response.text } else: return { "error": f"请求失败,状态码:{response.status_code}", "detail": response.text } def call_coze_all_article_workflow(parameters,is_async=False): """ 调用 Coze 工作流的函数 :param parameters: 传递给工作流的输入参数(字典格式) :param is_async: 是否异步执行(默认 False) :return: 工作流的执行结果 """ workflow_id = CONFIG['Coze']['workflow_id'] access_token = CONFIG['Coze']['access_token'] is_async = CONFIG['Coze']['is_async'].lower() == 'true' # 修复异步参数逻辑 url = "https://api.coze.cn/v1/workflow/run" headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } data = { "workflow_id": workflow_id, "parameters": parameters, "is_async": is_async } response = requests.post(url, json=data, headers=headers) if response.status_code == 200: try: # 解析响应 result_dict = json.loads(response.text) print(result_dict) # 获取data字段 data_str = result_dict['data'] # 如果data是字符串,尝试解析它 if isinstance(data_str, str): data_dict = json.loads(data_str) else: data_dict = data_str # 获取output的值 title = data_dict.get('title', '') article = data_dict.get('article', '') return title, article except Exception as e: return { "error": f"解析响应时出错:{str(e)}", "detail": response.text } else: return { "error": f"请求失败,状态码:{response.status_code}", "detail": response.text }