ArticleReplace/ai_studio.py
2025-11-29 14:37:27 +08:00

190 lines
5.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import requests
from config import *
# ==========================调用dify工作流===============================================
def call_dify_workflow(input_data):
"""
调用Dify工作流的函数。
:param input_data: 传递给工作流的输入数据
:return: 工作流的输出结果
"""
logger.info("Dify开始工作。。。")
api_key = CONFIG['Dify']['api_key']
user_id = CONFIG['Dify']['user_id']
url = CONFIG['Dify']['url']
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json',
}
data = {
"inputs": input_data,
"response_mode": "blocking",
"user": user_id
}
response = requests.post(url, headers=headers, data=json.dumps(data))
json_data = json.loads(response.text)
print("json_data:", json_data)
# 获取article的值
article = json_data['data']['outputs']['article']
# print("article:", article)
return article
# ==========================调用coze工作流==========================
def call_coze_workflow(parameters):
"""
调用 Coze 工作流的函数
:param parameters: 传递给工作流的输入参数(字典格式)
:return: 工作流的执行结果
"""
logger.info("Coze开始工作。。。。")
workflow_id = CONFIG['Coze']['workflow_id']
access_token = CONFIG['Coze']['access_token']
is_async = CONFIG['Coze']['is_async'].lower() == 'true'
url = "https://api.coze.cn/v1/workflow/run"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
data = {
"workflow_id": workflow_id,
"parameters": parameters,
"is_async": is_async
}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
# data = json.loads(response.text)['data']
# print("data",data['output'])
return response.text
else:
return {
"error": f"请求失败,状态码:{response.status_code}",
"detail": response.text
}
def call_coze_article_workflow(parameters):
"""
调用 Coze 工作流的函数
:param parameters: 传递给工作流的输入参数(字典格式)
:return: 工作流的执行结果
"""
workflow_id = CONFIG['Coze']['workflow_id']
access_token = CONFIG['Coze']['access_token']
is_async = CONFIG['Coze']['is_async'].lower() == 'true'
url = "https://api.coze.cn/v1/workflow/run"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
data = {
"workflow_id": workflow_id,
"parameters": parameters,
"is_async": is_async
}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
try:
# 解析响应
result_dict = json.loads(response.text)
print(result_dict)
# 获取data字段
data_str = result_dict['data']
# 如果data是字符串尝试解析它
if isinstance(data_str, str):
data_dict = json.loads(data_str)
else:
data_dict = data_str
# 获取output的值
output_value = data_dict.get('output', '')
return output_value
except Exception as e:
return {
"error": f"解析响应时出错:{str(e)}",
"detail": response.text
}
else:
return {
"error": f"请求失败,状态码:{response.status_code}",
"detail": response.text
}
def call_coze_all_article_workflow(parameters,is_async=False):
"""
调用 Coze 工作流的函数
:param parameters: 传递给工作流的输入参数(字典格式)
:param is_async: 是否异步执行(默认 False
:return: 工作流的执行结果
"""
workflow_id = CONFIG['Coze']['workflow_id']
access_token = CONFIG['Coze']['access_token']
is_async = CONFIG['Coze']['is_async'].lower() == 'true' # 修复异步参数逻辑
url = "https://api.coze.cn/v1/workflow/run"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
data = {
"workflow_id": workflow_id,
"parameters": parameters,
"is_async": is_async
}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
try:
# 解析响应
result_dict = json.loads(response.text)
print(result_dict)
# 获取data字段
data_str = result_dict['data']
# 如果data是字符串尝试解析它
if isinstance(data_str, str):
data_dict = json.loads(data_str)
else:
data_dict = data_str
# 获取output的值
title = data_dict.get('title', '')
article = data_dict.get('article', '')
return title, article
except Exception as e:
return {
"error": f"解析响应时出错:{str(e)}",
"detail": response.text
}
else:
return {
"error": f"请求失败,状态码:{response.status_code}",
"detail": response.text
}