修改获取网页内容代码

This commit is contained in:
wsb1224 2025-07-23 17:33:55 +08:00
parent 42fc2e661f
commit 3b305f1d72
11 changed files with 1040 additions and 495 deletions

View File

@ -106,7 +106,7 @@ class ArticleReplaceApp(tk.Tk):
# 生成类型选择
ttk.Label(control_frame, text="生成类型:").grid(row=3, column=0, padx=5, pady=5, sticky=tk.W)
self.generation_type_var = tk.StringVar(value="短篇")
self.generation_type_var = tk.StringVar(value="文章")
self.generation_type_combo = ttk.Combobox(control_frame, textvariable=self.generation_type_var, values=["短篇", "文章"], width=10, state="readonly")
self.generation_type_combo.grid(row=3, column=1, padx=5, pady=5, sticky=tk.W)
self.generation_type_combo.bind("<<ComboboxSelected>>", self.on_generation_type_changed)
@ -216,6 +216,10 @@ class ArticleReplaceApp(tk.Tk):
self.max_threads_var = tk.StringVar(value=CONFIG['General']['max_threads'])
ttk.Spinbox(parent, from_=1, to=10, textvariable=self.max_threads_var, width=5).grid(row=4, column=1, padx=5,
pady=5, sticky=tk.W)
# 保存按钮
ttk.Button(parent, text="保存配置", command=self.save_general_config).grid(row=5, column=1, padx=5, pady=10,
sticky=tk.E)
def init_database_config(self, parent):
# 数据库主机
@ -239,8 +243,12 @@ class ArticleReplaceApp(tk.Tk):
ttk.Entry(parent, textvariable=self.db_name_var, width=30).grid(row=3, column=1, padx=5, pady=5)
# 测试连接按钮
ttk.Button(parent, text="测试连接", command=self.test_db_connection).grid(row=4, column=1, padx=5, pady=10,
ttk.Button(parent, text="测试连接", command=self.test_db_connection).grid(row=4, column=1, padx=5, pady=5,
sticky=tk.E)
# 保存按钮
ttk.Button(parent, text="保存配置", command=self.save_database_config).grid(row=5, column=1, padx=5, pady=10,
sticky=tk.E)
def init_dify_config(self, parent):
# Dify API Key
@ -262,6 +270,10 @@ class ArticleReplaceApp(tk.Tk):
ttk.Label(parent, text="Input Data模板:").grid(row=3, column=0, padx=5, pady=5, sticky=tk.W)
self.dify_input_data_template_var = tk.StringVar(value=CONFIG['Dify'].get('input_data_template', '{"old_article": "{article_text}"}')) # 添加默认值
ttk.Entry(parent, textvariable=self.dify_input_data_template_var, width=50).grid(row=3, column=1, padx=5, pady=5)
# 保存按钮
ttk.Button(parent, text="保存配置", command=self.save_dify_config).grid(row=4, column=1, padx=5, pady=10,
sticky=tk.E)
def init_coze_config(self, parent):
# 生成类型选择(与主页面联动)
@ -332,9 +344,16 @@ class ArticleReplaceApp(tk.Tk):
ttk.Label(config_frame, text="Input Data模板:").grid(row=4, column=0, padx=5, pady=5, sticky=tk.W)
# Variable already initialized in __init__
ttk.Entry(config_frame, textvariable=self.coze_input_data_template_var, width=50).grid(row=4, column=1, padx=5, pady=5)
# 保存按钮
ttk.Button(config_frame, text="保存配置", command=self.save_coze_config).grid(row=5, column=1, padx=5, pady=10,
sticky=tk.E)
# 更新模板列表
self.update_template_list()
# 自动加载上次使用的模板
self.load_last_used_template()
def init_baidu_config(self, parent):
# 百度 API Key
@ -346,6 +365,10 @@ class ArticleReplaceApp(tk.Tk):
ttk.Label(parent, text="Secret Key:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W)
self.baidu_secret_key_var = tk.StringVar(value=CONFIG['Baidu']['secret_key'])
ttk.Entry(parent, textvariable=self.baidu_secret_key_var, width=50).grid(row=1, column=1, padx=5, pady=5)
# 保存按钮
ttk.Button(parent, text="保存配置", command=self.save_baidu_config).grid(row=2, column=1, padx=5, pady=10,
sticky=tk.E)
def init_image_config(self, parent):
# 裁剪百分比
@ -398,6 +421,10 @@ class ArticleReplaceApp(tk.Tk):
# 预览按钮
ttk.Button(parent, text="预览效果", command=self.preview_image_effect).grid(row=4, column=3, padx=5, pady=5,
sticky=tk.E)
# 保存按钮
ttk.Button(parent, text="保存配置", command=self.save_image_config).grid(row=5, column=3, padx=5, pady=10,
sticky=tk.E)
def init_keywords_config(self, parent):
# 违禁词列表
@ -466,7 +493,138 @@ class ArticleReplaceApp(tk.Tk):
# 处理文本,将换行符替换为逗号
words = self.banned_words_text.get(1.0, tk.END).strip().replace('\n', ',')
CONFIG['Keywords']['banned_words'] = words
save_config(CONFIG)
messagebox.showinfo("保存成功", "违禁词列表已更新")
def save_general_config(self):
# 保存常规配置
try:
CONFIG['General']['chrome_user_dir'] = self.chrome_dir_var.get()
CONFIG['General']['articles_path'] = self.articles_path_var.get()
CONFIG['General']['images_path'] = self.images_path_var.get()
CONFIG['General']['title_file'] = self.excel_file_var.get()
CONFIG['General']['max_threads'] = self.max_threads_var.get()
save_config(CONFIG)
# 更新全局变量
global USER_DIR_PATH, ARTICLES_BASE_PATH, IMGS_BASE_PATH, TITLE_BASE_PATH, MAX_THREADS
USER_DIR_PATH = CONFIG['General']['chrome_user_dir']
ARTICLES_BASE_PATH = CONFIG['General']['articles_path']
IMGS_BASE_PATH = CONFIG['General']['images_path']
TITLE_BASE_PATH = CONFIG['General']['title_file']
MAX_THREADS = int(CONFIG['General']['max_threads'])
# 创建必要的目录
if not os.path.exists(ARTICLES_BASE_PATH):
os.makedirs(ARTICLES_BASE_PATH)
if not os.path.exists(IMGS_BASE_PATH):
os.makedirs(IMGS_BASE_PATH)
messagebox.showinfo("保存成功", "常规配置已保存")
except Exception as e:
messagebox.showerror("保存失败", f"保存常规配置时出错:{e}")
def save_baidu_config(self):
# 保存百度API配置
try:
CONFIG['Baidu']['api_key'] = self.baidu_api_key_var.get()
CONFIG['Baidu']['secret_key'] = self.baidu_secret_key_var.get()
save_config(CONFIG)
messagebox.showinfo("保存成功", "百度API配置已保存")
except Exception as e:
messagebox.showerror("保存失败", f"保存百度API配置时出错{e}")
def save_image_config(self):
# 保存图片处理配置
try:
CONFIG['ImageModify']['crop_percent'] = self.crop_percent_var.get()
CONFIG['ImageModify']['min_rotation'] = self.min_rotation_var.get()
CONFIG['ImageModify']['max_rotation'] = self.max_rotation_var.get()
CONFIG['ImageModify']['min_brightness'] = self.min_brightness_var.get()
CONFIG['ImageModify']['max_brightness'] = self.max_brightness_var.get()
CONFIG['ImageModify']['watermark_text'] = self.watermark_text_var.get()
CONFIG['ImageModify']['watermark_opacity'] = self.watermark_opacity_var.get()
CONFIG['ImageModify']['overlay_opacity'] = self.overlay_opacity_var.get()
save_config(CONFIG)
messagebox.showinfo("保存成功", "图片处理配置已保存")
except Exception as e:
messagebox.showerror("保存失败", f"保存图片处理配置时出错:{e}")
def save_database_config(self):
# 保存数据库配置
try:
CONFIG['Database']['host'] = self.db_host_var.get()
CONFIG['Database']['user'] = self.db_user_var.get()
CONFIG['Database']['password'] = self.db_password_var.get()
CONFIG['Database']['database'] = self.db_name_var.get()
save_config(CONFIG)
messagebox.showinfo("保存成功", "数据库配置已保存")
except Exception as e:
messagebox.showerror("保存失败", f"保存数据库配置时出错:{e}")
def save_dify_config(self):
# 保存Dify配置
try:
CONFIG['Dify']['api_key'] = self.dify_api_key_var.get()
CONFIG['Dify']['user_id'] = self.dify_user_id_var.get()
CONFIG['Dify']['url'] = self.dify_url_var.get()
CONFIG['Dify']['input_data_template'] = self.dify_input_data_template_var.get()
save_config(CONFIG)
messagebox.showinfo("保存成功", "Dify配置已保存")
except Exception as e:
messagebox.showerror("保存失败", f"保存Dify配置时出错{e}")
def save_coze_config(self):
# 保存当前Coze模板配置
try:
# 获取当前选中的模板
selection = self.template_listbox.curselection()
if not selection:
# 如果没有选中模板只保存全局Coze配置
CONFIG['Coze']['workflow_id'] = self.coze_workflow_id_var.get()
CONFIG['Coze']['access_token'] = self.coze_access_token_var.get()
CONFIG['Coze']['is_async'] = self.coze_is_async_var.get()
CONFIG['Coze']['input_data_template'] = self.coze_input_data_template_var.get()
save_config(CONFIG)
messagebox.showinfo("保存成功", "Coze全局配置已保存")
return
# 获取当前选中的模板索引
index = selection[0]
current_type = self.coze_generation_type_var.get()
if current_type not in self.templates or index >= len(self.templates[current_type]):
messagebox.showerror("错误", "无效的模板选择")
return
# 更新模板配置
template = self.templates[current_type][index]
template['name'] = self.template_name_var.get()
template['workflow_id'] = self.coze_workflow_id_var.get()
template['access_token'] = self.coze_access_token_var.get()
template['is_async'] = self.coze_is_async_var.get()
template['input_data_template'] = self.coze_input_data_template_var.get()
# 保存模板到配置文件
self.save_templates()
# 同时更新全局Coze配置如果需要的话
CONFIG['Coze']['workflow_id'] = self.coze_workflow_id_var.get()
CONFIG['Coze']['access_token'] = self.coze_access_token_var.get()
CONFIG['Coze']['is_async'] = self.coze_is_async_var.get()
CONFIG['Coze']['input_data_template'] = self.coze_input_data_template_var.get()
save_config(CONFIG)
self.edit_status_label.config(text="已保存", foreground="green")
self.after(2000, lambda: self.edit_status_label.config(text=""))
messagebox.showinfo("保存成功", f"模板 '{template['name']}' 配置已保存")
except Exception as e:
messagebox.showerror("保存失败", f"保存Coze配置时出错{e}")
def on_generation_type_changed(self, event=None):
"""主页面生成类型改变时的处理"""
@ -498,6 +656,11 @@ class ArticleReplaceApp(tk.Tk):
if current_type in self.templates and index < len(self.templates[current_type]):
template = self.templates[current_type][index]
self.load_template_config(template)
# 更新上次使用的模板信息
CONFIG['Coze']['last_used_template'] = template['name']
CONFIG['Coze']['last_used_template_type'] = current_type
save_config(CONFIG) # 保存配置文件
def load_template_config(self, template):
"""加载模板配置到界面"""
@ -539,49 +702,48 @@ class ArticleReplaceApp(tk.Tk):
"""显示编辑状态"""
self.edit_status_label.config(text="未保存", foreground="red")
def add_template(self):
"""添加新模板"""
current_type = self.coze_generation_type_var.get()
# 获取当前模板列表中最大的序号
max_num = 0
for template in self.templates[current_type]:
try:
# 尝试将模板名称转换为整数
if template['name'].isdigit():
num = int(template['name'])
max_num = max(max_num, num)
except (ValueError, TypeError):
pass
template_name = f"{max_num + 1}"
# 获取当前配置作为默认值
default_workflow_id = CONFIG['Coze'].get('workflow_id', '')
default_access_token = CONFIG['Coze'].get('access_token', '')
default_is_async = CONFIG['Coze'].get('is_async', 'true')
default_template = CONFIG['Coze'].get('input_data_template',
'{"article": "{article_text}", "link":"{link}", "weijin":"{weijin}"}')
new_template = {
'name': template_name,
'type': current_type,
'workflow_id': default_workflow_id,
'access_token': default_access_token,
'is_async': default_is_async,
'input_data_template': default_template
}
self.templates[current_type].append(new_template)
self.update_template_list()
self.save_templates()
# 选中新添加的模板
self.template_listbox.selection_clear(0, tk.END)
self.template_listbox.selection_set(len(self.templates[current_type]) - 1)
self.load_template_config(new_template)
self.edit_status_label.config(text="已创建", foreground="green")
self.after(2000, lambda: self.edit_status_label.config(text="未保存", foreground="red"))
if current_type not in self.templates:
self.templates[current_type] = []
# 弹出对话框让用户输入模板名称
new_template_name = simpledialog.askstring("新增模板", "请输入新模板的名称:")
if new_template_name:
new_template_name = new_template_name.strip()
if not new_template_name:
messagebox.showwarning("输入无效", "模板名称不能为空。")
return
# 检查模板名称是否重复
if any(t['name'] == new_template_name for t in self.templates[current_type]):
messagebox.showwarning("名称重复", f"模板名称 '{new_template_name}' 已存在,请使用其他名称。")
return
new_template = {
'name': new_template_name,
'workflow_id': '',
'access_token': '',
'is_async': 'true',
'input_data_template': '{"article": "{article_text}", "link":"{link}", "weijin":"{weijin}"}'
}
self.templates[current_type].append(new_template)
self.update_template_list()
self.save_templates()
# 选中新添加的模板
new_index = len(self.templates[current_type]) - 1
self.template_listbox.selection_clear(0, tk.END)
self.template_listbox.selection_set(new_index)
# selection_set会触发on_template_selected事件自动加载模板配置
# 延迟设置状态确保覆盖on_template_selected中设置的状态
self.after(100, lambda: self.edit_status_label.config(text="已添加", foreground="green"))
self.after(2100, lambda: self.edit_status_label.config(text=""))
else:
messagebox.showinfo("取消操作", "已取消新增模板。")
def delete_template(self):
"""删除选中的模板"""
@ -638,17 +800,16 @@ class ArticleReplaceApp(tk.Tk):
# 验证输入数据模板的JSON格式
try:
# 替换占位符以便验证JSON格式
test_template = input_template.replace('{article_text}', '')\
.replace('{link}', '')\
.replace('{weijin}', '')
test_template = input_template.replace('{article_text}', '""')\
.replace('{link}', '""')\
.replace('{weijin}', '""')\
.replace('{title_text}', '""')
json.loads(test_template)
except json.JSONDecodeError as e:
messagebox.showerror("错误", f"输入数据模板不是有效的JSON格式\n{str(e)}")
return False
return True
return True
def save_template(self):
"""保存当前模板配置"""
@ -661,14 +822,30 @@ class ArticleReplaceApp(tk.Tk):
current_type = self.coze_generation_type_var.get()
if current_type in self.templates and index < len(self.templates[current_type]):
template = self.templates[current_type][index]
template['name'] = self.template_name_var.get().strip()
new_name = self.template_name_var.get().strip()
if not new_name:
messagebox.showwarning("输入无效", "模板名称不能为空。")
return
# 检查新名称是否重复,排除当前模板自身
if new_name != template['name'] and any(t['name'] == new_name for t in self.templates[current_type]):
messagebox.showwarning("名称重复", f"模板名称 '{new_name}' 已存在,请使用其他名称。")
return
template['name'] = new_name
template['workflow_id'] = self.coze_workflow_id_var.get().strip()
template['access_token'] = self.coze_access_token_var.get().strip()
template['is_async'] = self.coze_is_async_var.get()
template['input_data_template'] = self.coze_input_data_template_var.get().strip()
# 更新上次使用的模板信息
CONFIG['Coze']['last_used_template'] = template['name']
CONFIG['Coze']['last_used_template_type'] = current_type
self.update_template_list()
self.save_templates()
save_config(CONFIG) # 保存配置文件
self.edit_status_label.config(text="已保存", foreground="green")
self.after(2000, lambda: self.edit_status_label.config(text=""))
else:
@ -691,14 +868,28 @@ class ArticleReplaceApp(tk.Tk):
# 弹出重命名对话框
new_name = simpledialog.askstring("重命名模板", "请输入新的模板名称:", initialvalue=old_name)
if new_name and new_name.strip() and new_name != old_name:
template['name'] = new_name.strip()
if new_name:
new_name = new_name.strip()
if not new_name:
messagebox.showwarning("输入无效", "模板名称不能为空。")
return
if new_name == old_name:
messagebox.showinfo("未修改", "新名称与旧名称相同,无需重命名。")
return
# 检查新名称是否重复
if any(t['name'] == new_name for t in self.templates[current_type] if t != template):
messagebox.showwarning("名称重复", f"模板名称 '{new_name}' 已存在,请使用其他名称。")
return
template['name'] = new_name
self.update_template_list()
self.save_templates()
# 重新选中重命名后的模板
self.template_listbox.selection_set(index)
self.edit_status_label.config(text="已重命名", foreground="green")
self.after(2000, lambda: self.edit_status_label.config(text=""))
else:
messagebox.showinfo("取消操作", "已取消重命名模板。")
def duplicate_template(self):
"""复制当前选中的模板"""
@ -715,34 +906,28 @@ class ArticleReplaceApp(tk.Tk):
template = self.templates[current_type][index]
new_template = template.copy()
# 获取当前模板列表中最大的副本序号
# 生成新的模板名称,确保唯一性
base_name = template['name']
max_num = 0
for t in self.templates[current_type]:
if t['name'].startswith(f"{base_name}_副本"):
try:
num = int(t['name'].split('_副本')[-1]) if t['name'].split('_副本')[-1] else 1
max_num = max(max_num, num)
except (IndexError, ValueError):
pass
# 设置新的模板名称
new_name = f"{base_name}_副本{max_num + 1 if max_num > 0 else ''}"
copy_num = 1
new_name = f"{base_name}_副本"
while any(t['name'] == new_name for t in self.templates[current_type]):
copy_num += 1
new_name = f"{base_name}_副本{copy_num}"
new_template['name'] = new_name
self.templates[current_type].append(new_template)
self.update_template_list()
self.save_templates()
# 选中新复制的模板
new_index = len(self.templates[current_type]) - 1
self.template_listbox.selection_clear(0, tk.END)
self.template_listbox.selection_set(new_index)
self.load_template_config(new_template)
# selection_set会触发on_template_selected事件自动加载模板配置
# 更新状态提示
self.edit_status_label.config(text=f"已复制'{new_name}'", foreground="green")
self.after(2000, lambda: self.edit_status_label.config(text="未保存", foreground="red"))
# 延迟设置状态确保覆盖on_template_selected中设置的状态
self.after(100, lambda: self.edit_status_label.config(text="已复制", foreground="green"))
self.after(2100, lambda: self.edit_status_label.config(text=""))
def use_template(self):
"""使用模板功能 - 弹出模板选择对话框并应用所选模板配置"""
@ -827,6 +1012,10 @@ class ArticleReplaceApp(tk.Tk):
CONFIG['Coze']['is_async'] = selected_template.get('is_async', 'true')
CONFIG['Coze']['input_data_template'] = selected_template.get('input_data_template', '')
# 保存上次使用的模板信息
CONFIG['Coze']['last_used_template'] = selected_template['name']
CONFIG['Coze']['last_used_template_type'] = current_type
# 保存配置
save_config(CONFIG)
@ -837,9 +1026,12 @@ class ArticleReplaceApp(tk.Tk):
self.template_listbox.selection_set(i)
break
# 显示成功提示
self.edit_status_label.config(text=f"已应用模板 '{selected_template['name']}'", foreground="green")
self.after(2000, lambda: self.edit_status_label.config(text=""))
# 加载模板配置到界面
self.load_template_config(selected_template)
# 延迟设置状态确保覆盖load_template_config设置的状态
self.after(100, lambda: self.edit_status_label.config(text=f"已应用模板 '{selected_template['name']}'", foreground="green"))
self.after(2100, lambda: self.edit_status_label.config(text=""))
# 关闭对话框
dialog.destroy()
@ -888,12 +1080,22 @@ class ArticleReplaceApp(tk.Tk):
value = templates_section[key]
# 确保value是字符串类型
if isinstance(value, str):
self.templates[template_type] = json.loads(value)
try:
self.templates[template_type] = json.loads(value)
except json.JSONDecodeError as e:
logger.warning(f"解析模板配置{key}失败: {e}")
self.templates[template_type] = []
else:
logger.warning(f"模板配置{key}的值不是字符串类型: {type(value)}")
self.templates[template_type] = []
# 确保每个类型都有列表
for template_type in ["短篇", "文章"]:
if template_type not in self.templates:
self.templates[template_type] = []
except Exception as e:
logger.error(f"加载模板配置失败: {e}")
# 确保模板字典已初始化
self.templates = {"短篇": [], "文章": []}
def save_templates(self):
"""保存模板到配置文件"""
@ -910,6 +1112,45 @@ class ArticleReplaceApp(tk.Tk):
except Exception as e:
logger.error(f"保存模板配置失败: {e}")
messagebox.showerror("保存失败", f"保存模板配置时出错:{e}")
def load_last_used_template(self):
"""加载上次使用的模板"""
try:
# 检查是否有上次使用的模板信息
last_template = CONFIG['Coze'].get('last_used_template', '')
last_template_type = CONFIG['Coze'].get('last_used_template_type', '文章')
if last_template and last_template_type in self.templates:
# 设置模板类型
self.coze_generation_type_var.set(last_template_type)
self.generation_type_var.set(last_template_type) # 同步到主页面
# 更新模板列表
self.update_template_list()
# 查找并选中上次使用的模板
found = False
for i, template in enumerate(self.templates[last_template_type]):
if template['name'] == last_template:
self.template_listbox.selection_clear(0, tk.END)
self.template_listbox.selection_set(i)
self.template_listbox.see(i) # 确保可见
# 加载模板配置
self.load_template_config(template)
# 显示状态信息
self.edit_status_label.config(text=f"已加载上次使用的模板 '{last_template}'")
self.after(3000, lambda: self.edit_status_label.config(text=""))
found = True
break
if not found:
logger.warning(f"未找到上次使用的模板: {last_template}")
except Exception as e:
logger.error(f"加载上次使用的模板失败: {e}")
# 出错时不显示错误消息,静默失败
def get_current_template(self):
"""获取当前选中的模板配置"""

View File

@ -126,7 +126,7 @@ def call_coze_article_workflow(parameters):
}
def call_coze_chang_article_workflow(parameters):
def call_coze_all_article_workflow(parameters,is_async=False):
"""
调用 Coze 工作流的函数
@ -134,10 +134,9 @@ def call_coze_chang_article_workflow(parameters):
:param is_async: 是否异步执行默认 False
:return: 工作流的执行结果
"""
workflow_id = CONFIG['Coze']['workflow_id']
access_token = CONFIG['Coze']['access_token']
is_async = CONFIG['Coze']['is_async'].lower() == 'true'
is_async = CONFIG['Coze']['is_async'].lower() == 'False'
url = "https://api.coze.cn/v1/workflow/run"
headers = {
"Authorization": f"Bearer {access_token}",
@ -158,6 +157,7 @@ def call_coze_chang_article_workflow(parameters):
# 直接解析整个result字符串
result_dict = ast.literal_eval(response.text)
print(result_dict)
# 解析data字段
data_dict = ast.literal_eval(result_dict['data'])
@ -165,8 +165,7 @@ def call_coze_chang_article_workflow(parameters):
# 获取output的值
title = data_dict['title']
article = data_dict['article']
return title,article
return title, article
else:
return {
"error": f"请求失败,状态码:{response.status_code}",

View File

@ -18,7 +18,10 @@ DEFAULT_CONFIG = {
"Coze": {
"workflow_id": "",
"access_token": "",
"is_async": "false"
"is_async": "false",
"input_data_template": "{\"article\": \"{article_text}\", \"link\":\"{link}\", \"weijin\":\"{weijin}\"}",
"last_used_template": "",
"last_used_template_type": "文章"
},
"Database": {
"host": "27.106.125.150",

View File

@ -1,313 +0,0 @@
import threading
import queue
from ai_studio import call_dify_workflow, call_coze_workflow,call_coze_article_workflow
from databases import *
from images_edit import download_and_process_images
from utils import *
from get_web_content import *
from config import *
# ==============================主程序===========================
def process_link(link, ai_service):
"""
处理单个链接
:param link: 要处理的链接
:param ai_service: AI服务提供商可选值dify, coze
"""
try:
if link.startswith("https://www.toutiao.com"):
title_text, article_text, img_urls = toutiao_extract_content(link)
elif link.startswith("https://mp.weixin.qq.co"):
title_text, article_text, img_urls = wechat_extract_content(link)
else:
title_text, article_text, img_urls = "", "", []
print(title_text)
if title_text == "":
return
elif len(title_text) > 100:
return
# 获取数据库配置
host = CONFIG['Database']['host']
user = CONFIG['Database']['user']
password = CONFIG['Database']['password']
database = CONFIG['Database']['database']
# 判断文章内容是否有违禁词
check_keywords = check_keywords_in_text(title_text)
title = extract_content_until_punctuation(article_text).replace("正文:", "")
logger.info(img_urls)
from datetime import datetime
# 获取当前时间并格式化
current_time = datetime.now().strftime("%H:%M:%S")
# 打印当前时间
print("当前时间:", current_time)
logger.info(title_text)
if ai_service == "dify":
if check_keywords:
print("文章中有违禁词!")
check_link_insert(host, user, password, database, link)
return
# 从配置加载 input_data 模板
input_data_template_str = CONFIG['Dify'].get('input_data_template', '{{"title": "{article_text}"}}')
try:
# 解析模板字符串为字典
input_data_template = json.loads(input_data_template_str)
# 使用实际变量格式化模板
input_data = {k: v.format(article_text=article_text) for k, v in input_data_template.items()}
except (json.JSONDecodeError, KeyError, AttributeError) as e:
logger.error(f"处理 Dify input_data 模板时出错: {e}. 使用默认模板.")
input_data = {
"old_article": article_text
}
# input_data = {
# "old_article": article_text
# }
message_content = call_dify_workflow(input_data)
elif ai_service == "coze":
logger.info("coze正在处理")
weijin = ""
if check_keywords:
weijin = "违禁"
# 从配置加载 Coze input_data 模板
# input_data_template_str = CONFIG['Coze'].get('input_data_template',
# f'{{"title": "{title_text}"}}')
# try:
# # 解析模板字符串为字典
# input_data_template = json.loads(input_data_template_str)
# # 使用实际变量格式化模板
# input_data = {k: v.format(article_text=article_text) for k, v in
# input_data_template.items()}
# except (json.JSONDecodeError, KeyError, AttributeError) as e:
# logger.error(f"处理 Coze input_data 模板时出错: {e}. 使用默认模板.")
# input_data = {
# "title": title_text
#
# }
input_data = {
"title": title_text
}
message_content = call_coze_article_workflow(input_data)
# message_content = msg['result']
# 获取当前时间并格式化
current_time = datetime.now().strftime("%H:%M:%S")
# 打印当前时间
print("当前时间:", current_time)
# finally_article = message_content.replace("正文:", "") + "\n"
file_name = handle_duplicate_files_advanced(ARTICLES_BASE_PATH,title_text)[0]
article_save_path = os.path.join(ARTICLES_BASE_PATH, f"{file_name}.txt")
if '*' in message_content or '#' in message_content or "-" in message_content:
# 使用正则表达式一次性替换多个字符
old_content = re.sub(r'[*#-]', '', message_content)
else:
# 如果不需要替换,直接使用原内容
old_content = message_content
print("改写完成的文章:" + old_content)
# 删除AI词汇
content = old_content
# check_link_insert(host, user, password, database, link)
# 判断文章合规度
if text_detection(content) == "合规":
print("文章合规")
pass
else:
print("文章不合规")
return
with open(article_save_path, 'w', encoding='utf-8') as f:
f.write(content)
logging.info('文本已经保存')
if img_urls:
download_and_process_images(img_urls, file_name)
except Exception as e:
logging.error(f"处理链接 {link} 时出错: {e}")
raise
def link_to_text(num_threads=None, ai_service="dify"):
use_link_path = 'use_link_path.txt'
# 读取链接
links = read_excel(TITLE_BASE_PATH)
# 过滤已处理的链接
filtered_links = []
host = CONFIG['Database']['host']
user = CONFIG['Database']['user']
password = CONFIG['Database']['password']
database = CONFIG['Database']['database']
# for link in links:
# logging.info(f"总共{len(links)}个链接")
# if check_link_exists(host, user, password, database, link):
# logger.info(f"链接已存在: {link}")
# continue
# else:
# filtered_links.append(link)
# logger.info(f"链接不存在: {link}")
# print("链接不存在,存储到过滤器中:", link)
for link in links:
logging.info(f"总共{len(links)}个链接")
filtered_links.append(link)
# if check_link_exists(host, user, password, database, link):
# logger.info(f"链接已存在: {link}")
# continue
# else:
# filtered_links.append(link)
# logger.info(f"链接不存在: {link}")
# print("链接不存在,存储到过滤器中:", link)
if not filtered_links:
logger.info("没有新链接需要处理")
return []
# 使用多线程处理链接
results = process_links_with_threads(filtered_links, num_threads, ai_service)
# 记录已处理的链接
with open(use_link_path, 'a+', encoding='utf-8') as f:
for link, success, _ in results:
if success:
f.write(link + "\n")
return results
def link_to_mysql_text(prompt1=None, prompt2=None, num_threads=None, ai_service="dify"):
use_link_path = 'use_link_path.txt'
# 读取链接
links = read_excel(TITLE_BASE_PATH)
# 过滤已处理的链接
filtered_links = []
host = CONFIG['Database']['host']
user = CONFIG['Database']['user']
password = CONFIG['Database']['password']
database = CONFIG['Database']['database']
for link in links:
logging.info(f"总共{len(links)}个链接")
if check_link_exists(host, user, password, database, link):
logger.info(f"链接已存在: {link}")
continue
else:
filtered_links.append(link)
logger.info(f"链接不存在: {link}")
print("链接不存在,存储到过滤器中:", link)
if not filtered_links:
logger.info("没有新链接需要处理")
return []
# 使用多线程处理链接
results = process_links_with_threads(filtered_links, num_threads, ai_service)
# 记录已处理的链接
with open(use_link_path, 'a+', encoding='utf-8') as f:
for link, success, _ in results:
if success:
f.write(link + "\n")
return results
# 创建一个任务队列和结果队列
task_queue = queue.Queue()
result_queue = queue.Queue()
# 工作线程函数
def worker(ai_service):
while True:
try:
# 从队列中获取任务
link = task_queue.get()
if link is None: # 结束信号
break
# 处理链接
try:
logger.info(f"开始处理链接:{link}")
process_link(link, ai_service)
result_queue.put((link, True, None)) # 成功
except Exception as e:
result_queue.put((link, False, str(e))) # 失败
logger.error(f"处理链接 {link} 时出错: {e}")
# 标记任务完成
task_queue.task_done()
except Exception as e:
logger.error(f"工作线程出错: {e}")
# 多线程处理链接
def process_links_with_threads(links, num_threads=None, ai_service="dify"):
if num_threads is None:
num_threads = min(MAX_THREADS, len(links))
else:
num_threads = min(num_threads, MAX_THREADS, len(links))
# 清空任务队列和结果队列
while not task_queue.empty():
task_queue.get()
while not result_queue.empty():
result_queue.get()
# 创建工作线程
threads = []
# 将AI服务选择传递给worker函数
for _ in range(num_threads):
t = threading.Thread(target=worker, args=(ai_service,))
t.daemon = True
t.start()
threads.append(t)
# 添加任务到队列
for link in links:
task_queue.put(link)
# 添加结束信号
for _ in range(num_threads):
task_queue.put(None)
# 等待所有线程完成
for t in threads:
t.join()
# 处理结果
results = []
while not result_queue.empty():
results.append(result_queue.get())
return results

View File

@ -2,7 +2,7 @@ import threading
import queue
import json # 导入 json 模块
from ai_studio import call_dify_workflow, call_coze_workflow,call_coze_article_workflow,call_coze_chang_article_workflow
from ai_studio import call_dify_workflow,call_coze_article_workflow,call_coze_all_article_workflow
from databases import *
from images_edit import download_and_process_images
@ -105,24 +105,27 @@ def process_link(link_info, ai_service, current_template=None,generation_type=No
try:
# 从配置加载 Coze input_data 模板
input_data_template_str = CONFIG['Coze'].get('input_data_template',
'{"article": "{article_text}", "link":"{link}", "weijin":"{weijin}"}')
input_data_template_str = CONFIG['Coze'].get('input_data_template')
# 解析模板字符串为字典
input_data_template = json.loads(input_data_template_str)
# 使用实际变量格式化模板
input_data = input_data_template
except (json.JSONDecodeError, KeyError, AttributeError) as e:
logger.error(f"处理 Coze input_data 模板时出错: {e}. 使用默认模板.")
input_data = {
"article": article_text
}
try:
title = ""
if generation_type == "短篇":
input_data = {
"article": article_text
}
print("coze中输入",input_data)
message_content = call_coze_article_workflow(input_data)
elif generation_type == "文章":
title, message_content = call_coze_chang_article_workflow(input_data)
print("原文中标题为:",title_text)
print("原文中内容为:",article_text)
input_data = {
"title":title_text,
"article": article_text
}
print("发送的请求数据为:",input_data)
title, message_content = call_coze_all_article_workflow(input_data)
finally:
@ -136,6 +139,10 @@ def process_link(link_info, ai_service, current_template=None,generation_type=No
# 获取当前时间并格式化
current_time = datetime.now().strftime("%H:%M:%S")
print("原文章", article_text)
print("========================")
print("改写后的文章",message_content)
# 打印当前时间
print("当前时间:", current_time)
file_name = ""

View File

@ -0,0 +1,25 @@
{
"architectures": [
"BertForMaskedLM"
],
"attention_probs_dropout_prob": 0.1,
"directionality": "bidi",
"hidden_act": "gelu",
"hidden_dropout_prob": 0.1,
"hidden_size": 768,
"initializer_range": 0.02,
"intermediate_size": 3072,
"layer_norm_eps": 1e-12,
"max_position_embeddings": 512,
"model_type": "bert",
"num_attention_heads": 12,
"num_hidden_layers": 12,
"pad_token_id": 0,
"pooler_fc_size": 768,
"pooler_num_attention_heads": 12,
"pooler_num_fc_layers": 3,
"pooler_size_per_head": 128,
"pooler_type": "first_token_transform",
"type_vocab_size": 2,
"vocab_size": 21128
}

View File

@ -0,0 +1,464 @@
import re
import random
import argparse
import sys
import os
from typing import List, Tuple, Optional, Dict, Any
from pathlib import Path
import logging
class TextProcessor:
"""文本处理器类,支持句子拆分和字符交换"""
def __init__(self, min_length: int = 30, custom_punctuation: Optional[str] = None):
"""
初始化文本处理器
Args:
min_length: 句子长度阈值
custom_punctuation: 自定义标点符号如果为None则使用默认标点
"""
self.min_length = min_length
self.sentence_endings = custom_punctuation or r'[?!;]'
self.statistics = {
'total_sentences': 0,
'processed_sentences': 0,
'total_chars': 0,
'swapped_chars': 0
}
# 设置日志
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
self.logger = logging.getLogger(__name__)
def split_sentences(self, text: str) -> List[Tuple[str, str]]:
"""
按标点符号拆分句子保留标点符号
Args:
text: 输入文本
Returns:
List[Tuple[str, str]]: 每个元组包含 (句子内容, 标点符号)
"""
if not text.strip():
return []
# 使用正则表达式拆分,保留分隔符
parts = re.split(f'({self.sentence_endings})', text)
sentences = []
i = 0
while i < len(parts):
content = parts[i].strip()
if content: # 非空内容
# 检查下一个部分是否是标点符号
if i + 1 < len(parts) and re.match(self.sentence_endings, parts[i + 1]):
punctuation = parts[i + 1]
i += 2
else:
punctuation = ''
i += 1
sentences.append((content, punctuation))
self.statistics['total_sentences'] += 1
else:
i += 1
return sentences
def swap_random_chars(self, sentence: str) -> str:
"""
对超长句子随机交换相邻两个字符的顺序
Args:
sentence: 输入句子
Returns:
str: 处理后的句子
"""
# 边界情况处理
if not sentence or len(sentence) <= self.min_length or len(sentence) <= 3:
return sentence
# 转换为字符列表便于操作
chars = list(sentence)
original_length = len(chars)
# 确定可交换的范围(避开首尾字符,且需要成对相邻)
# 对于长度为n的句子可交换的相邻对位置为(1,2), (2,3), ..., (n-3,n-2)
start_idx = 1
end_idx = len(chars) - 3 # 最后一个可交换对的起始位置
if end_idx < start_idx:
return sentence
try:
# 随机选择一个相邻对的起始位置
swap_start = random.randint(start_idx, end_idx)
swap_end = swap_start + 1
# 交换相邻的两个字符
chars[swap_start], chars[swap_end] = chars[swap_end], chars[swap_start]
# 更新统计信息
self.statistics['processed_sentences'] += 1
self.statistics['swapped_chars'] += 2
self.logger.debug(f"交换相邻位置 {swap_start}{swap_end},句子长度:{original_length}")
except (ValueError, IndexError) as e:
self.logger.warning(f"字符交换失败:{e}")
return sentence
return ''.join(chars)
def process_text(self, text: str) -> str:
"""
处理文本拆分句子并对超长句子进行字符交换
Args:
text: 输入文本
Returns:
str: 处理后的文本
"""
if not text:
return text
# 重置统计信息
self.statistics = {
'total_sentences': 0,
'processed_sentences': 0,
'total_chars': len(text),
'swapped_chars': 0
}
# 按段落分割
paragraphs = text.split('\n')
processed_paragraphs = []
for paragraph in paragraphs:
if not paragraph.strip():
processed_paragraphs.append(paragraph)
continue
# 拆分句子
sentences = self.split_sentences(paragraph)
# 处理每个句子
processed_sentences = []
for sentence_content, punctuation in sentences:
# 对句子内容进行字符交换
processed_content = self.swap_random_chars(sentence_content)
processed_sentences.append(processed_content + punctuation)
# 重新组合句子
processed_paragraph = ''.join(processed_sentences)
processed_paragraphs.append(processed_paragraph)
return '\n'.join(processed_paragraphs)
def get_statistics(self) -> Dict[str, Any]:
"""获取处理统计信息"""
return self.statistics.copy()
def print_statistics(self):
"""打印处理统计信息"""
stats = self.get_statistics()
print("\n" + "=" * 50)
print("处理统计信息:")
print(f"总字符数:{stats['total_chars']}")
print(f"总句子数:{stats['total_sentences']}")
print(f"处理句子数:{stats['processed_sentences']}")
print(f"交换字符数:{stats['swapped_chars']}")
if stats['total_sentences'] > 0:
print(f"处理率:{stats['processed_sentences'] / stats['total_sentences'] * 100:.1f}%")
print("=" * 50)
class FileHandler:
"""文件处理器,负责文件的读写操作"""
@staticmethod
def read_file(filename: str) -> str:
"""
读取文件内容支持多种编码
Args:
filename: 文件路径
Returns:
str: 文件内容
Raises:
FileNotFoundError: 文件不存在
PermissionError: 权限不足
UnicodeDecodeError: 编码错误
"""
if not os.path.exists(filename):
raise FileNotFoundError(f"文件 '{filename}' 不存在")
if not os.access(filename, os.R_OK):
raise PermissionError(f"没有读取文件 '{filename}' 的权限")
# 尝试多种编码格式
encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1']
for encoding in encodings:
try:
with open(filename, 'r', encoding=encoding) as f:
content = f.read()
logging.info(f"使用 {encoding} 编码成功读取文件:{filename}")
return content
except UnicodeDecodeError:
continue
raise UnicodeDecodeError(f"无法解码文件 '{filename}',尝试的编码格式:{encodings}")
@staticmethod
def write_file(filename: str, content: str, encoding: str = 'utf-8') -> None:
"""
写入文件内容
Args:
filename: 输出文件路径
content: 要写入的内容
encoding: 编码格式
Raises:
PermissionError: 权限不足
OSError: 磁盘空间不足等系统错误
"""
# 确保目录存在
output_dir = os.path.dirname(filename)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
try:
with open(filename, 'w', encoding=encoding) as f:
f.write(content)
logging.info(f"成功写入文件:{filename}")
except PermissionError:
raise PermissionError(f"没有写入文件 '{filename}' 的权限")
except OSError as e:
raise OSError(f"写入文件 '{filename}' 时发生错误:{e}")
def setup_argument_parser() -> argparse.ArgumentParser:
"""设置命令行参数解析器"""
parser = argparse.ArgumentParser(
description='文本句子字符交换处理器',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
使用示例
%(prog)s -f input.txt # 处理文件
%(prog)s -t "你的文本内容" # 直接处理文本
%(prog)s -f input.txt -l 20 # 设置长度阈值为20
%(prog)s -f input.txt -o output.txt # 输出到文件
%(prog)s -f input.txt -p "。!?" -s # 自定义标点符号并显示统计
"""
)
# 输入选项
input_group = parser.add_mutually_exclusive_group(required=True)
input_group.add_argument('-f', '--file', help='输入文件路径')
input_group.add_argument('-t', '--text', help='直接输入文本')
input_group.add_argument('--stdin', action='store_true',
help='从标准输入读取文本')
# 处理选项
parser.add_argument('-l', '--length', type=int, default=30,
help='句子长度阈值默认30')
parser.add_argument('-p', '--punctuation',
help='自定义标点符号(默认:。!?;?!;')
parser.add_argument('-o', '--output', help='输出文件路径')
parser.add_argument('-e', '--encoding', default='utf-8',
help='输出文件编码默认utf-8')
# 其他选项
parser.add_argument('-s', '--statistics', action='store_true',
help='显示处理统计信息')
parser.add_argument('-v', '--verbose', action='store_true',
help='显示详细日志')
parser.add_argument('--seed', type=int, help='随机数种子(用于测试)')
return parser
def main():
"""主函数:处理命令行参数和文本处理"""
parser = setup_argument_parser()
args = parser.parse_args()
# 设置日志级别
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# 设置随机数种子(用于测试)
if args.seed:
random.seed(args.seed)
# 获取输入文本
try:
if args.file:
text = FileHandler.read_file(args.file)
elif args.text:
text = args.text
elif args.stdin:
text = sys.stdin.read()
else:
print("错误:请指定输入源")
sys.exit(1)
if not text.strip():
print("警告:输入文本为空")
sys.exit(0)
except (FileNotFoundError, PermissionError, UnicodeDecodeError) as e:
print(f"错误:{e}")
sys.exit(1)
# 创建处理器并处理文本
try:
processor = TextProcessor(
min_length=args.length,
custom_punctuation=args.punctuation
)
processed_text = processor.process_text(text)
# 输出结果
if args.output:
FileHandler.write_file(args.output, processed_text, args.encoding)
print(f"处理完成,结果已保存到 '{args.output}'")
else:
print("处理结果:")
print("-" * 50)
print(processed_text)
# 显示统计信息
if args.statistics:
processor.print_statistics()
except Exception as e:
print(f"处理过程中发生错误:{e}")
if args.verbose:
import traceback
traceback.print_exc()
sys.exit(1)
# 单元测试
def run_tests():
"""运行基本的单元测试"""
print("运行单元测试...")
# 测试句子拆分
processor = TextProcessor(min_length=6)
# 测试1普通句子拆分
test_text = "这是第一句。这是第二句!第三句?"
sentences = processor.split_sentences(test_text)
assert len(sentences) == 3, f"期望3个句子实际{len(sentences)}"
assert sentences[0] == ("这是第一句", ""), f"第一句解析错误:{sentences[0]}"
# 测试2相邻字符交换
long_sentence = "这是一个很长的句子用来测试字符交换功能"
random.seed(42) # 固定种子以便测试
result = processor.swap_random_chars(long_sentence)
assert result != long_sentence, "长句子应该被修改"
assert len(result) == len(long_sentence), "交换后长度应该不变"
# 验证只交换了相邻的两个字符
diff_count = sum(1 for i, (a, b) in enumerate(zip(long_sentence, result)) if a != b)
assert diff_count == 2, f"应该只有2个字符位置发生变化实际{diff_count}"
# 测试3短句子不变
short_sentence = "短句"
result = processor.swap_random_chars(short_sentence)
assert result == short_sentence, "短句子不应该被修改"
# 测试4边界情况
empty_result = processor.swap_random_chars("")
assert empty_result == "", "空字符串应该保持不变"
print("✓ 所有测试通过!")
# 示例使用
def replace_text(text):
# 检查是否运行测试
if len(sys.argv) > 1 and sys.argv[1] == 'test':
run_tests()
sys.exit(0)
# 命令行模式
if len(sys.argv) > 1:
main()
else:
# 示例演示
sample_text = text
print("示例演示:")
print("原文:")
print(sample_text)
print("\n" + "=" * 50 + "\n")
processor = TextProcessor(min_length=9)
processed = processor.process_text(sample_text)
print("处理后:")
print(processed)
processor.print_statistics()
print("\n使用说明:")
print("命令行用法:")
print(" python script.py -f input.txt # 处理文件")
print(" python script.py -t '你的文本内容' # 直接处理文本")
print(" python script.py -f input.txt -l 20 # 设置长度阈值为20")
print(" python script.py -f input.txt -o output.txt # 输出到文件")
print(" python script.py -f input.txt -p '。!?' -s # 自定义标点符号并显示统计")
print(" python script.py test # 运行单元测试")
return processed
text = """盘龙江又冒出“神秘生物”啦!这次可不是娃娃鱼,网友都说:这届市民太有才咯!
01 跑步都能碰到怪鱼昆明市民这操作简直笑死人
咱就说啊最近昆明盘龙江里的神秘生物是不是有点太多啦上个月万彩城河段才惊现粉色娃娃鱼前几天又有市民在江边跑步的时候突然瞅见水里游着一条浑身雪白的怪鱼远远看去老像国家二级保护动物娃娃鱼了这位热心肠的市民啥也没说直接就报了警还特别贴心地把鱼捞上岸装进塑料袋里就好像生怕这鱼跑了似的警察赶到的时候现场都围了一圈人在那看热闹呢有人拍照有人录视频不知道的还以为在江边搞啥生物展览会
02 蝾螈假装娃娃鱼森林公安说这是家养的
民警一看这鱼长得还真有点特别赶紧联系森林公安来瞅瞅结果这剧情反转得厉害啊这压根就不是娃娃鱼而是一条跟娃娃鱼长得很像的蝾螈更逗的是森林公安民警拎着塑料袋看了老半天还补了一句这是家养的这时候我都能想象到围观群众一脸懵的样子
网友的神评论都刷爆屏了
蝾螈我就出来溜达溜达咋就进局子了呢
我建议把盘龙江改名叫神奇动物江算了下次会不会冒出尼斯湖水怪啊
这届市民也太负责了连家养的宠物都要报警上交
03 前面有粉色娃娃鱼后面有白色蝾螈盘龙江成网红打卡点
其实这已经是盘龙江今年第二次上热搜啦4月份的时候有阿姨在江里发现一条1.5米长12公斤重的粉色娃娃鱼当时还把专业救援队都给叫来了这次虽然是个乌龙事儿但网友都开始瞎想连续剧情节了下次是不是该轮到金色锦鲤啦
最逗的是评论区有人把自家鱼缸的照片都晒出来了警察叔叔我家这条金龙鱼要不要也交上去啊手动狗头
04 警察叔叔重点提醒这些动物可不能随便抓
虽说这次是虚惊一场但民警还是一本正经地提醒大家野生蝾螈和娃娃鱼可都是国家二级保护动物自己私自去抓或者养那可是可能要吃法律官司的特别是现在有些短视频平台上还有人把保护动物当宠物卖起一些什么小恐龙六角鱼之类的花里胡哨的名字来忽悠人大家可千万别上当
05 吃瓜群众应对指南
要是碰到不认识的动物该咋办呢记住这个口诀就行
1 别伸手去碰万一这动物有毒或者带着病菌呢
2 别给它投喂吃的乱喂东西可能会把它们害死
3 赶紧报警专业的事儿就交给专业的人来办
最后来个灵魂提问**你觉得盘龙江下次会出现啥神奇生物**欢迎在评论区尽情开脑洞
本文信息来源昆明警方发布都市条形码等官方通报
谢谢大家看这篇文章哈欢迎在评论区留下你的神吐槽"""
result = replace_text(text)
print(result)

View File

@ -1,56 +1,13 @@
import requests
from get_web_content import toutiao_w_extract_content
# 使用示例
if __name__ == "__main__":
url = "https://www.toutiao.com/article/7527481094266962473/"
title, content, images = toutiao_w_extract_content(url)
def call_coze_article_workflow(workflow_id,access_token,parameters,is_async=False):
"""
调用 Coze 工作流的函数
:param parameters: 传递给工作流的输入参数字典格式
:param is_async: 是否异步执行默认 False
:return: 工作流的执行结果
"""
url = "https://api.coze.cn/v1/workflow/run"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
data = {
"workflow_id": workflow_id,
"parameters": parameters,
"is_async": is_async
}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
# data = json.loads(response.text)['data']
# print("data",data['output'])
import ast
# 直接解析整个result字符串
result_dict = ast.literal_eval(response.text)
# 解析data字段
data_dict = ast.literal_eval(result_dict['data'])
# 获取output的值
title = data_dict['title']
article = data_dict['article']
return title, article
else:
return {
"error": f"请求失败,状态码:{response.status_code}",
"detail": response.text
}
workflow_id = "7509764025128845366"
access_token = "pat_0DczPLquEPhA3mSqokHTPpU9KNHrM3mz5sZKSWxi7ZeWK1Fi5UjPzQihq1DwCQ91"
parameters = {
"title":"1",
"article":"1"
}
title,article = call_coze_article_workflow(workflow_id,access_token,parameters)
print(title,article)
print(f"标题: {title}")
print(f"内容长度: {len(content)}")
print(f"图片数量: {len(images)}")
print("图片URLs:")
for i, img_url in enumerate(images, 1):
print(f"{i}. {img_url}")

File diff suppressed because one or more lines are too long

View File

@ -3,7 +3,7 @@ import json
import os
import random
import re
from docx.shared import Pt, RGBColor
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT, WD_UNDERLINE
from docx.enum.text import WD_ALIGN_PARAGRAPH
@ -192,13 +192,19 @@ def create_word_document(text, image_folder, output_path, title):
:param output_path:
:return:
"""
doc = Document()
paragraphs = split_text_into_paragraphs(text)
insert_images_into_paragraphs(paragraphs, image_folder, doc, title)
# modify_document(doc)
doc.save(output_path)
format_word_document(output_path, output_path)
print(f'文档已保存到: {output_path}')
try:
doc = Document()
paragraphs = split_text_into_paragraphs(text)
insert_images_into_paragraphs(paragraphs, image_folder, doc, title)
# modify_document(doc)
doc.save(output_path)
try:
format_word_document(output_path, output_path)
except Exception as e:
print(f"格式化文档 {output_path} 时出错: {e}")
print(f'文档已保存到: {output_path}')
except Exception as e:
print(f"创建文档 {output_path} 时出错: {e}")
# 读取指定路径下txt文本的内容
@ -208,8 +214,12 @@ def read_text_file(file_path):
:param file_path:
:return:
"""
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
try:
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
except Exception as e:
print(f"读取文件 {file_path} 时出错: {e}")
return ""
def get_file_name(file_path):
@ -260,41 +270,56 @@ def apply_random_style(paragraph):
def txt2docx(txt_path, image_path, keep_txt=True):
file_path = txt_path
txts = sorted([os.path.join(file_path, txt) for txt in os.listdir(file_path) if
try:
txts = sorted([os.path.join(file_path, txt) for txt in os.listdir(file_path) if
txt.lower().endswith(('txt'))])
except Exception as e:
print(f"读取文件夹 {file_path} 时出错: {e}")
sg.popup_error(f"读取文件夹 {file_path} 时出错: {e}")
return
img_path = image_path
for txt in txts:
print("正在修改:" + txt)
text = read_text_file(txt)
# print(text)
txt_name = get_file_name(txt)
title_name = txt_name.replace(".txt", "")
title = title_name
print(title)
if "正文:" in text:
new_text = text.split('正文:')[1].replace("```markdown", "").replace("```", "")
else:
new_text = text.replace("```markdown", "").replace("```", "")
content = new_text
# image_folder = img_path + r'\\' + txt_name.replace(".txt", "").rstrip(".")
# image_folder = os.path.join(img_path, txt_name.replace(".txt", "").rstrip("."))
from pathlib import Path
from pathlib import Path
try:
print("正在修改:" + txt)
text = read_text_file(txt)
if not text: # 如果读取失败,跳过此文件
print(f"跳过文件: {txt} (读取失败)")
continue
# print(text)
txt_name = get_file_name(txt)
title_name = txt_name.replace(".txt", "")
title = title_name
print(title)
if "正文:" in text:
new_text = text.split('正文:')[1].replace("```markdown", "").replace("```", "")
else:
new_text = text.replace("```markdown", "").replace("```", "")
content = new_text
from pathlib import Path
img_path = Path(img_path)
image_folder = img_path / txt_name.replace(".txt", "").rstrip(".")
img_path = Path(img_path)
image_folder = img_path / txt_name.replace(".txt", "").rstrip(".")
# crop_and_replace_images(image_folder)
# crop_and_replace_images(image_folder)
create_word_document(content, image_folder, txt.replace(".txt", ".docx"), title_name)
create_word_document(content, image_folder, txt.replace(".txt", ".docx"), title_name)
# 根据用户选择决定是否删除原始txt文件
if not keep_txt:
os.remove(txt)
print(f"已删除原始文件: {txt}")
else:
print(f"保留原始文件: {txt}")
# 根据用户选择决定是否删除原始txt文件
if not keep_txt:
try:
os.remove(txt)
print(f"已删除原始文件: {txt}")
except Exception as e:
print(f"删除文件 {txt} 时出错: {e}")
else:
print(f"保留原始文件: {txt}")
except Exception as e:
print(f"处理文件 {txt} 时出错: {e}")
continue # 继续处理下一个文件
# 加载设置
@ -313,8 +338,20 @@ def save_settings(settings):
# 自定义函数,用于处理用户选择的文件夹
def process_folders(folder1, folder2, keep_txt=True):
# 检查文件夹是否存在
if not os.path.exists(folder1):
sg.popup_error(f"文章文件夹不存在: {folder1}")
return
if not os.path.exists(folder2):
sg.popup_error(f"图片文件夹不存在: {folder2}")
return
# 在这里添加处理文件夹的代码
txt2docx(folder1, folder2, keep_txt)
try:
txt2docx(folder1, folder2, keep_txt)
sg.popup("处理完成!")
except Exception as e:
sg.popup_error(f"处理过程中出错: {e}")
# 加载之前的设置

8
text translation/.idea/.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml