ArticleReplace/ArticleReplace.py
2025-11-29 14:37:27 +08:00

1491 lines
68 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import sys
from PIL import Image, ImageDraw, ImageFont, ImageEnhance
import time
import random
import threading
import customtkinter as ctk
from config import *
from tkinter import messagebox, filedialog, simpledialog
import pandas as pd
import pymysql
from main_process import link_to_text, task_queue, result_queue
from auth_validator import AuthValidator
sys.setrecursionlimit(5000)
class ArticleReplaceApp(ctk.CTk):
def __init__(self):
super().__init__()
self.title("文章工作流调用工具(软件仅供交流使用)")
self.geometry("900x600")
# 设置CustomTkinter外观模式
ctk.set_appearance_mode("System")
ctk.set_default_color_theme("blue")
# 创建标签页控件
self.notebook = ctk.CTkTabview(self)
self.notebook.pack(fill=ctk.BOTH, expand=True, padx=10, pady=10)
# 创建主页面
self.main_frame = self.notebook.add("主页面")
# 创建配置页面
self.config_frame = self.notebook.add("配置")
# 创建免责声明页面
self.disclaimer_frame = self.notebook.add("免责声明")
# 初始化变量
self.running = False
self.thread = None
self.total_links = 0
self.processed_links = 0
# 初始化Coze配置变量
self.template_name_var = ctk.StringVar()
self.coze_workflow_id_var = ctk.StringVar(value=CONFIG['Coze']['workflow_id'])
self.coze_access_token_var = ctk.StringVar(value=CONFIG['Coze']['access_token'])
self.coze_is_async_var = ctk.StringVar(value=CONFIG['Coze'].get('is_async', 'true'))
# 初始化模板数据结构
self.templates = {
"短篇": [],
"文章": []
}
# 初始化主页面
self.init_main_frame()
# 初始化配置页面
self.init_config_frame()
# 初始化免责声明页面
self.init_disclaimer_frame()
# 设置关闭窗口事件
self.protocol("WM_DELETE_WINDOW", self.on_close)
def init_main_frame(self):
# 创建左侧控制面板
control_frame = ctk.CTkFrame(self.main_frame, fg_color="transparent")
control_frame.pack(side=ctk.LEFT, fill=ctk.Y, padx=10, pady=10)
# Excel文件选择
ctk.CTkLabel(control_frame, text="Excel文件:").grid(row=0, column=0, padx=5, pady=5, sticky=ctk.W)
self.excel_path_var = ctk.StringVar(value=TITLE_BASE_PATH)
ctk.CTkEntry(control_frame, textvariable=self.excel_path_var, width=150).grid(row=0, column=1, padx=5, pady=5)
ctk.CTkButton(control_frame, text="浏览", command=self.browse_excel, width=60).grid(row=0, column=2, padx=5,
pady=5)
# 线程数设置
ctk.CTkLabel(control_frame, text="线程数:").grid(row=1, column=0, padx=5, pady=5, sticky=ctk.W)
self.thread_count_var = ctk.StringVar(value="1")
ctk.CTkEntry(control_frame, textvariable=self.thread_count_var, width=50).grid(row=1, column=1, padx=5, pady=5,
sticky=ctk.W)
# AI服务提供商选择
ctk.CTkLabel(control_frame, text="工作流选择:").grid(row=2, column=0, padx=5, pady=5, sticky=ctk.W)
self.ai_service_var = ctk.StringVar(value="coze")
ai_service_combo = ctk.CTkComboBox(control_frame, variable=self.ai_service_var, values=["dify", "coze"],
width=120, state="readonly")
ai_service_combo.grid(row=2, column=1, padx=5, pady=5, sticky=ctk.W)
# 生成类型选择
ctk.CTkLabel(control_frame, text="生成类型:").grid(row=3, column=0, padx=5, pady=5, sticky=ctk.W)
self.generation_type_var = ctk.StringVar(value="文章")
self.generation_type_combo = ctk.CTkComboBox(control_frame, variable=self.generation_type_var,
values=["短篇", "文章"], width=120, state="readonly")
self.generation_type_combo.grid(row=3, column=1, padx=5, pady=5, sticky=ctk.W)
self.generation_type_combo.configure(command=self.on_generation_type_changed)
# 开始按钮
self.start_button = ctk.CTkButton(control_frame, text="开始处理", command=self.start_processing)
self.start_button.grid(row=4, column=0, columnspan=3, padx=5, pady=20)
# 进度条
ctk.CTkLabel(control_frame, text="处理进度:").grid(row=5, column=0, padx=5, pady=5, sticky=ctk.W)
self.progress_var = ctk.DoubleVar()
self.progress_bar = ctk.CTkProgressBar(control_frame, variable=self.progress_var)
self.progress_bar.grid(row=5, column=1, columnspan=2, padx=5, pady=5, sticky=ctk.EW)
self.progress_bar.set(0)
# 创建右侧日志面板
log_frame = ctk.CTkFrame(self.main_frame, fg_color="transparent")
log_frame.pack(side=ctk.RIGHT, fill=ctk.BOTH, expand=True, padx=10, pady=10)
# 日志文本框 - 使用CTkTextbox替代ScrolledText
self.log_text = ctk.CTkTextbox(log_frame, width=500, height=400)
self.log_text.pack(fill=ctk.BOTH, expand=True, padx=5, pady=5)
# 添加日志处理器
self.log_handler = LogTextHandler(self.log_text)
self.log_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
self.log_handler.setFormatter(formatter)
logger.addHandler(self.log_handler)
def init_config_frame(self):
# 创建配置标签页
config_notebook = ctk.CTkTabview(self.config_frame)
config_notebook.pack(fill=ctk.BOTH, expand=True, padx=5, pady=5)
# 创建各个配置页面
general_frame = config_notebook.add("常规设置")
database_frame = config_notebook.add("数据库设置")
dify_frame = config_notebook.add("Dify设置")
coze_frame = config_notebook.add("Coze设置")
baidu_frame = config_notebook.add("百度API设置")
image_frame = config_notebook.add("图片处理设置")
keywords_frame = config_notebook.add("违禁词设置")
# 初始化各个配置页面
self.init_general_config(general_frame)
self.init_database_config(database_frame)
self.init_dify_config(dify_frame)
self.init_coze_config(coze_frame)
self.init_baidu_config(baidu_frame)
self.init_image_config(image_frame)
self.init_keywords_config(keywords_frame)
# 保存按钮
save_button = ctk.CTkButton(self.config_frame, text="保存所有配置", command=self.save_all_configs)
save_button.pack(side=ctk.RIGHT, padx=10, pady=10)
def init_general_config(self, parent):
# Chrome用户目录
ctk.CTkLabel(parent, text="Chrome用户目录:").grid(row=0, column=0, padx=5, pady=5, sticky=ctk.W)
self.chrome_dir_var = ctk.StringVar(value=CONFIG['General']['chrome_user_dir'])
ctk.CTkEntry(parent, textvariable=self.chrome_dir_var, width=200).grid(row=0, column=1, padx=5, pady=5)
ctk.CTkButton(parent, text="浏览", command=lambda: self.browse_directory(self.chrome_dir_var), width=60).grid(
row=0, column=2, padx=5, pady=5)
# 文章保存路径
ctk.CTkLabel(parent, text="文章保存路径:").grid(row=1, column=0, padx=5, pady=5, sticky=ctk.W)
self.articles_path_var = ctk.StringVar(value=CONFIG['General']['articles_path'])
ctk.CTkEntry(parent, textvariable=self.articles_path_var, width=200).grid(row=1, column=1, padx=5, pady=5)
ctk.CTkButton(parent, text="浏览", command=lambda: self.browse_directory(self.articles_path_var),
width=60).grid(row=1, column=2, padx=5, pady=5)
# 图片保存路径
ctk.CTkLabel(parent, text="图片保存路径:").grid(row=2, column=0, padx=5, pady=5, sticky=ctk.W)
self.images_path_var = ctk.StringVar(value=CONFIG['General']['images_path'])
ctk.CTkEntry(parent, textvariable=self.images_path_var, width=200).grid(row=2, column=1, padx=5, pady=5)
ctk.CTkButton(parent, text="浏览", command=lambda: self.browse_directory(self.images_path_var), width=60).grid(
row=2, column=2, padx=5, pady=5)
# Excel文件路径
ctk.CTkLabel(parent, text="默认Excel文件:").grid(row=3, column=0, padx=5, pady=5, sticky=ctk.W)
self.excel_file_var = ctk.StringVar(value=CONFIG['General']['title_file'])
ctk.CTkEntry(parent, textvariable=self.excel_file_var, width=200).grid(row=3, column=1, padx=5, pady=5)
ctk.CTkButton(parent, text="浏览", command=lambda: self.browse_file(self.excel_file_var,
[("Excel文件", "*.xlsx"),
("所有文件", "*.*")]), width=60).grid(
row=3, column=2, padx=5, pady=5)
# 最大线程数
ctk.CTkLabel(parent, text="最大线程数:").grid(row=4, column=0, padx=5, pady=5, sticky=ctk.W)
self.max_threads_var = ctk.StringVar(value=CONFIG['General']['max_threads'])
ctk.CTkEntry(parent, textvariable=self.max_threads_var, width=50).grid(row=4, column=1, padx=5, pady=5,
sticky=ctk.W)
# 最小文章字数
ctk.CTkLabel(parent, text="最小文章字数:").grid(row=5, column=0, padx=5, pady=5, sticky=ctk.W)
self.min_article_length_var = ctk.StringVar(value=CONFIG['General'].get('min_article_length', '100'))
ctk.CTkEntry(parent, textvariable=self.min_article_length_var, width=50).grid(row=5, column=1, padx=5, pady=5,
sticky=ctk.W)
# 保存按钮
ctk.CTkButton(parent, text="保存配置", command=self.save_general_config).grid(row=6, column=1, padx=5, pady=10,
sticky=ctk.E)
def init_database_config(self, parent):
# 数据库主机
ctk.CTkLabel(parent, text="数据库主机:").grid(row=0, column=0, padx=5, pady=5, sticky=ctk.W)
self.db_host_var = ctk.StringVar(value=CONFIG['Database']['host'])
ctk.CTkEntry(parent, textvariable=self.db_host_var, width=150).grid(row=0, column=1, padx=5, pady=5)
# 数据库用户名
ctk.CTkLabel(parent, text="数据库用户名:").grid(row=1, column=0, padx=5, pady=5, sticky=ctk.W)
self.db_user_var = ctk.StringVar(value=CONFIG['Database']['user'])
ctk.CTkEntry(parent, textvariable=self.db_user_var, width=150).grid(row=1, column=1, padx=5, pady=5)
# 数据库密码
ctk.CTkLabel(parent, text="数据库密码:").grid(row=2, column=0, padx=5, pady=5, sticky=ctk.W)
self.db_password_var = ctk.StringVar(value=CONFIG['Database']['password'])
ctk.CTkEntry(parent, textvariable=self.db_password_var, width=150, show="*").grid(row=2, column=1, padx=5,
pady=5)
# 数据库名称
ctk.CTkLabel(parent, text="数据库名称:").grid(row=3, column=0, padx=5, pady=5, sticky=ctk.W)
self.db_name_var = ctk.StringVar(value=CONFIG['Database']['database'])
ctk.CTkEntry(parent, textvariable=self.db_name_var, width=150).grid(row=3, column=1, padx=5, pady=5)
# 测试连接按钮
ctk.CTkButton(parent, text="测试连接", command=self.test_db_connection).grid(row=4, column=1, padx=5, pady=5,
sticky=ctk.E)
# 保存按钮
ctk.CTkButton(parent, text="保存配置", command=self.save_database_config).grid(row=5, column=1, padx=5, pady=10,
sticky=ctk.E)
def init_dify_config(self, parent):
# Dify API Key
ctk.CTkLabel(parent, text="API Key:").grid(row=0, column=0, padx=5, pady=5, sticky=ctk.W)
self.dify_api_key_var = ctk.StringVar(value=CONFIG['Dify']['api_key'])
ctk.CTkEntry(parent, textvariable=self.dify_api_key_var, width=200).grid(row=0, column=1, padx=5, pady=5)
# Dify User ID
ctk.CTkLabel(parent, text="User ID:").grid(row=1, column=0, padx=5, pady=5, sticky=ctk.W)
self.dify_user_id_var = ctk.StringVar(value=CONFIG['Dify']['user_id'])
ctk.CTkEntry(parent, textvariable=self.dify_user_id_var, width=150).grid(row=1, column=1, padx=5, pady=5)
# Dify URL
ctk.CTkLabel(parent, text="URL:").grid(row=2, column=0, padx=5, pady=5, sticky=ctk.W)
self.dify_url_var = ctk.StringVar(value=CONFIG['Dify']['url'])
ctk.CTkEntry(parent, textvariable=self.dify_url_var, width=200).grid(row=2, column=1, padx=5, pady=5)
# Dify Input Data Template
ctk.CTkLabel(parent, text="Input Data模板:").grid(row=3, column=0, padx=5, pady=5, sticky=ctk.W)
self.dify_input_data_template_var = ctk.StringVar(
value=CONFIG['Dify'].get('input_data_template', '{"old_article": "{article_text"}'))
ctk.CTkEntry(parent, textvariable=self.dify_input_data_template_var, width=200).grid(row=3, column=1, padx=5,
pady=5)
# 保存按钮
ctk.CTkButton(parent, text="保存配置", command=self.save_dify_config).grid(row=4, column=1, padx=5, pady=10,
sticky=ctk.E)
def init_coze_config(self, parent):
# 生成类型选择(与主页面联动)
type_frame = ctk.CTkFrame(parent, fg_color="transparent")
type_frame.grid(row=0, column=0, columnspan=3, padx=5, pady=5, sticky=ctk.EW)
ctk.CTkLabel(type_frame, text="生成类型:").pack(side=ctk.LEFT, padx=5)
self.coze_generation_type_var = ctk.StringVar(value="短篇")
self.coze_generation_type_combo = ctk.CTkComboBox(type_frame, variable=self.coze_generation_type_var,
values=["短篇", "文章"], width=120, state="readonly")
self.coze_generation_type_combo.pack(side=ctk.LEFT, padx=5)
self.coze_generation_type_combo.configure(command=self.on_coze_generation_type_changed)
# 编辑状态标签
self.edit_status_label = ctk.CTkLabel(type_frame, text="", text_color="blue")
self.edit_status_label.pack(side=ctk.LEFT, padx=20)
# 加载已保存的模板
self.load_templates()
# 初始化变量跟踪
self._setup_var_trace()
# 模板管理框架
template_frame = ctk.CTkFrame(parent)
template_frame.grid(row=1, column=0, columnspan=3, padx=5, pady=10, sticky=ctk.EW)
# 模板列表标题
ctk.CTkLabel(template_frame, text="模板管理", font=ctk.CTkFont(size=14, weight="bold")).grid(row=0, column=0,
columnspan=2,
padx=5, pady=5)
# 创建包含列表框和滚动条的容器
list_container = ctk.CTkFrame(template_frame, fg_color="transparent")
list_container.grid(row=1, column=0, padx=5, pady=5, sticky=ctk.NSEW)
# 使用CTkScrollableFrame作为可滚动列表的替代方案
self.template_list_frame = ctk.CTkScrollableFrame(list_container, width=250, height=150)
self.template_list_frame.pack(fill=ctk.BOTH, expand=True)
# 存储模板按钮的列表
self.template_buttons = []
self.selected_template_index = None
# 模板操作按钮
button_frame = ctk.CTkFrame(template_frame, fg_color="transparent")
button_frame.grid(row=1, column=1, padx=10, pady=5, sticky=ctk.N)
ctk.CTkButton(button_frame, text="新增模板", command=self.add_template, width=100).pack(pady=2)
ctk.CTkButton(button_frame, text="删除模板", command=self.delete_template, width=100).pack(pady=2)
ctk.CTkButton(button_frame, text="重命名模板", command=self.rename_template, width=100).pack(pady=2)
ctk.CTkButton(button_frame, text="保存模板", command=self.save_template, width=100).pack(pady=2)
ctk.CTkButton(button_frame, text="复制模板", command=self.duplicate_template, width=100).pack(pady=2)
ctk.CTkButton(button_frame, text="使用模板", command=self.use_template, width=100).pack(pady=2)
# 当前模板配置
config_frame = ctk.CTkFrame(parent, fg_color="transparent")
config_frame.grid(row=2, column=0, columnspan=3, padx=5, pady=10, sticky=ctk.EW)
# 模板名称
ctk.CTkLabel(config_frame, text="模板名称:").grid(row=0, column=0, padx=5, pady=5, sticky=ctk.W)
ctk.CTkEntry(config_frame, textvariable=self.template_name_var, width=150).grid(row=0, column=1, padx=5, pady=5)
# Coze Workflow ID
ctk.CTkLabel(config_frame, text="Workflow ID:").grid(row=1, column=0, padx=5, pady=5, sticky=ctk.W)
ctk.CTkEntry(config_frame, textvariable=self.coze_workflow_id_var, width=200).grid(row=1, column=1, padx=5,
pady=5)
# Coze Access Token
ctk.CTkLabel(config_frame, text="Access Token:").grid(row=2, column=0, padx=5, pady=5, sticky=ctk.W)
ctk.CTkEntry(config_frame, textvariable=self.coze_access_token_var, width=200).grid(row=2, column=1, padx=5,
pady=5)
# Coze Is Async
ctk.CTkLabel(config_frame, text="Is Async:").grid(row=3, column=0, padx=5, pady=5, sticky=ctk.W)
ctk.CTkComboBox(config_frame, variable=self.coze_is_async_var, values=["true", "false"], width=120,
state="readonly").grid(row=3, column=1, padx=5, pady=5, sticky=ctk.W)
# 保存按钮
ctk.CTkButton(config_frame, text="保存配置", command=self.save_coze_config).grid(row=5, column=1, padx=5,
pady=10, sticky=ctk.E)
# 更新模板列表
self.update_template_list()
# 自动加载上次使用的模板
self.load_last_used_template()
def init_baidu_config(self, parent):
# 百度 API Key
ctk.CTkLabel(parent, text="API Key:").grid(row=0, column=0, padx=5, pady=5, sticky=ctk.W)
self.baidu_api_key_var = ctk.StringVar(value=CONFIG['Baidu']['api_key'])
ctk.CTkEntry(parent, textvariable=self.baidu_api_key_var, width=200).grid(row=0, column=1, padx=5, pady=5)
# 百度 Secret Key
ctk.CTkLabel(parent, text="Secret Key:").grid(row=1, column=0, padx=5, pady=5, sticky=ctk.W)
self.baidu_secret_key_var = ctk.StringVar(value=CONFIG['Baidu']['secret_key'])
ctk.CTkEntry(parent, textvariable=self.baidu_secret_key_var, width=200).grid(row=1, column=1, padx=5, pady=5)
# 是否启用违规检测
ctk.CTkLabel(parent, text="启用违规检测:").grid(row=2, column=0, padx=5, pady=5, sticky=ctk.W)
self.baidu_enable_detection_var = ctk.StringVar(value=CONFIG['Baidu'].get('enable_detection', 'false'))
enable_detection_combo = ctk.CTkComboBox(parent, variable=self.baidu_enable_detection_var,
values=["true", "false"], width=120, state="readonly")
enable_detection_combo.grid(row=2, column=1, padx=5, pady=5, sticky=ctk.W)
# 保存按钮
ctk.CTkButton(parent, text="保存配置", command=self.save_baidu_config).grid(row=3, column=1, padx=5, pady=10,
sticky=ctk.E)
def init_image_config(self, parent):
# 裁剪百分比
ctk.CTkLabel(parent, text="裁剪百分比:").grid(row=0, column=0, padx=5, pady=5, sticky=ctk.W)
self.crop_percent_var = ctk.StringVar(value=CONFIG['ImageModify']['crop_percent'])
ctk.CTkEntry(parent, textvariable=self.crop_percent_var, width=50).grid(row=0, column=1, padx=5, pady=5,
sticky=ctk.W)
# 最小旋转角度
ctk.CTkLabel(parent, text="最小旋转角度:").grid(row=1, column=0, padx=5, pady=5, sticky=ctk.W)
self.min_rotation_var = ctk.StringVar(value=CONFIG['ImageModify']['min_rotation'])
ctk.CTkEntry(parent, textvariable=self.min_rotation_var, width=50).grid(row=1, column=1, padx=5, pady=5,
sticky=ctk.W)
# 最大旋转角度
ctk.CTkLabel(parent, text="最大旋转角度:").grid(row=2, column=0, padx=5, pady=5, sticky=ctk.W)
self.max_rotation_var = ctk.StringVar(value=CONFIG['ImageModify']['max_rotation'])
ctk.CTkEntry(parent, textvariable=self.max_rotation_var, width=50).grid(row=2, column=1, padx=5, pady=5,
sticky=ctk.W)
# 最小亮度
ctk.CTkLabel(parent, text="最小亮度:").grid(row=3, column=0, padx=5, pady=5, sticky=ctk.W)
self.min_brightness_var = ctk.StringVar(value=CONFIG['ImageModify']['min_brightness'])
ctk.CTkEntry(parent, textvariable=self.min_brightness_var, width=50).grid(row=3, column=1, padx=5, pady=5,
sticky=ctk.W)
# 最大亮度
ctk.CTkLabel(parent, text="最大亮度:").grid(row=4, column=0, padx=5, pady=5, sticky=ctk.W)
self.max_brightness_var = ctk.StringVar(value=CONFIG['ImageModify']['max_brightness'])
ctk.CTkEntry(parent, textvariable=self.max_brightness_var, width=50).grid(row=4, column=1, padx=5, pady=5,
sticky=ctk.W)
# 水印文字
ctk.CTkLabel(parent, text="水印文字:").grid(row=0, column=2, padx=5, pady=5, sticky=ctk.W)
self.watermark_text_var = ctk.StringVar(value=CONFIG['ImageModify']['watermark_text'])
ctk.CTkEntry(parent, textvariable=self.watermark_text_var, width=150).grid(row=0, column=3, padx=5, pady=5)
# 水印透明度
ctk.CTkLabel(parent, text="水印透明度:").grid(row=1, column=2, padx=5, pady=5, sticky=ctk.W)
self.watermark_opacity_var = ctk.StringVar(value=CONFIG['ImageModify']['watermark_opacity'])
ctk.CTkEntry(parent, textvariable=self.watermark_opacity_var, width=50).grid(row=1, column=3, padx=5, pady=5,
sticky=ctk.W)
# 蒙版透明度
ctk.CTkLabel(parent, text="蒙版透明度:").grid(row=2, column=2, padx=5, pady=5, sticky=ctk.W)
self.overlay_opacity_var = ctk.StringVar(value=CONFIG['ImageModify']['overlay_opacity'])
ctk.CTkEntry(parent, textvariable=self.overlay_opacity_var, width=50).grid(row=2, column=3, padx=5, pady=5,
sticky=ctk.W)
# 预览按钮
ctk.CTkButton(parent, text="预览效果", command=self.preview_image_effect).grid(row=4, column=3, padx=5, pady=5,
sticky=ctk.E)
# 保存按钮
ctk.CTkButton(parent, text="保存配置", command=self.save_image_config).grid(row=5, column=3, padx=5, pady=10,
sticky=ctk.E)
def init_keywords_config(self, parent):
# 违禁词列表
ctk.CTkLabel(parent, text="违禁词列表:").grid(row=0, column=0, padx=5, pady=5, sticky=ctk.W)
self.banned_words_text = ctk.CTkTextbox(parent, width=500, height=300)
self.banned_words_text.grid(row=1, column=0, columnspan=2, padx=5, pady=5, sticky=ctk.NSEW)
self.banned_words_text.insert("1.0", CONFIG['Keywords']['banned_words'].replace(',', '\n'))
# 保存按钮
ctk.CTkButton(parent, text="保存违禁词", command=self.save_banned_words).grid(row=2, column=1, padx=5, pady=5,
sticky=ctk.E)
# 配置行列权重
parent.columnconfigure(0, weight=1)
parent.rowconfigure(1, weight=1)
def init_disclaimer_frame(self):
# 创建免责声明内容框架
disclaimer_content = ctk.CTkFrame(self.disclaimer_frame, fg_color="transparent")
disclaimer_content.pack(fill=ctk.BOTH, expand=True, padx=20, pady=20)
# 标题
title_label = ctk.CTkLabel(disclaimer_content, text="免责声明", font=ctk.CTkFont(size=16, weight="bold"))
title_label.pack(pady=10)
# 免责声明文本
disclaimer_text = ctk.CTkTextbox(disclaimer_content, width=700, height=400, wrap="word")
disclaimer_text.pack(fill=ctk.BOTH, expand=True, padx=10, pady=10)
disclaimer_text.insert("1.0", """
软件使用免责声明
1. 合法使用声明
本软件仅供合法、正当用途使用。用户应当遵守中华人民共和国相关法律法规,不得将本软件用于任何违法犯罪活动。
2. 内容责任声明
用户通过本软件生成、处理或发布的所有内容,其版权归属、合法性及内容真实性由用户自行负责。本软件开发者不对用户使用本软件处理的内容承担任何法律责任。
3. 使用风险声明
用户应自行承担使用本软件的风险。本软件按"现状"提供,不提供任何明示或暗示的保证,包括但不限于适销性、特定用途适用性和非侵权性的保证。
4. 禁止用途
严禁将本软件用于以下活动:
- 违反国家法律法规的活动
- 侵犯他人知识产权或其他合法权益的活动
- 传播虚假、欺诈或误导性信息的活动
- 从事任何可能危害国家安全、社会稳定的活动
- 其他违背社会公德、商业道德的活动
5. 责任限制
在法律允许的最大范围内,对于因使用或无法使用本软件而导致的任何直接、间接、偶然、特殊、惩罚性或后果性损害,本软件开发者不承担任何责任。
6. 协议更新
本免责声明可能会不定期更新,更新后的内容将在软件中公布,不再另行通知。用户继续使用本软件即表示接受修改后的免责声明。
7. 最终解释
本免责声明的最终解释权归本软件开发者所有。
""")
# 设置为只读 - CTkTextbox使用configure方法
disclaimer_text.configure(state="disabled")
# 确认按钮
confirm_frame = ctk.CTkFrame(disclaimer_content, fg_color="transparent")
confirm_frame.pack(pady=10)
ctk.CTkButton(confirm_frame, text="我已阅读并同意以上声明", command=lambda: self.notebook.set("主页面")).pack()
def save_all_configs(self):
"""保存所有配置到配置文件"""
try:
# 保存所有单独的配置
self.save_general_config()
self.save_database_config()
self.save_dify_config()
self.save_coze_config()
self.save_baidu_config()
self.save_image_config()
self.save_banned_words()
messagebox.showinfo("保存成功", "所有配置已保存")
except Exception as e:
messagebox.showerror("保存失败", f"保存配置时出错:{e}")
def on_coze_generation_type_changed(self, choice=None):
"""coze页面生成类型改变时的处理"""
self.update_template_list()
def save_general_config(self):
# 保存常规配置
try:
CONFIG['General']['chrome_user_dir'] = self.chrome_dir_var.get()
CONFIG['General']['articles_path'] = self.articles_path_var.get()
CONFIG['General']['images_path'] = self.images_path_var.get()
CONFIG['General']['title_file'] = self.excel_file_var.get()
CONFIG['General']['max_threads'] = self.max_threads_var.get()
CONFIG['General']['min_article_length'] = self.min_article_length_var.get()
save_config(CONFIG)
# 更新全局变量
global USER_DIR_PATH, ARTICLES_BASE_PATH, IMGS_BASE_PATH, TITLE_BASE_PATH, MAX_THREADS
USER_DIR_PATH = CONFIG['General']['chrome_user_dir']
ARTICLES_BASE_PATH = CONFIG['General']['articles_path']
IMGS_BASE_PATH = CONFIG['General']['images_path']
TITLE_BASE_PATH = CONFIG['General']['title_file']
MAX_THREADS = int(CONFIG['General']['max_threads'])
# 创建必要的目录
if not os.path.exists(ARTICLES_BASE_PATH):
os.makedirs(ARTICLES_BASE_PATH)
if not os.path.exists(IMGS_BASE_PATH):
os.makedirs(IMGS_BASE_PATH)
messagebox.showinfo("保存成功", "常规配置已保存")
except Exception as e:
messagebox.showerror("保存失败", f"保存常规配置时出错:{e}")
def save_database_config(self):
# 保存数据库配置
try:
CONFIG['Database']['host'] = self.db_host_var.get()
CONFIG['Database']['user'] = self.db_user_var.get()
CONFIG['Database']['password'] = self.db_password_var.get()
CONFIG['Database']['database'] = self.db_name_var.get()
save_config(CONFIG)
messagebox.showinfo("保存成功", "数据库配置已保存")
except Exception as e:
messagebox.showerror("保存失败", f"保存数据库配置时出错:{e}")
def save_dify_config(self):
# 保存Dify配置
try:
CONFIG['Dify']['api_key'] = self.dify_api_key_var.get()
CONFIG['Dify']['user_id'] = self.dify_user_id_var.get()
CONFIG['Dify']['url'] = self.dify_url_var.get()
CONFIG['Dify']['input_data_template'] = self.dify_input_data_template_var.get()
save_config(CONFIG)
messagebox.showinfo("保存成功", "Dify配置已保存")
except Exception as e:
messagebox.showerror("保存失败", f"保存Dify配置时出错{e}")
def save_coze_config(self):
# 保存当前Coze模板配置
try:
# 获取当前选中的模板
if self.selected_template_index is None:
# 如果没有选中模板只保存全局Coze配置
CONFIG['Coze']['workflow_id'] = self.coze_workflow_id_var.get()
CONFIG['Coze']['access_token'] = self.coze_access_token_var.get()
CONFIG['Coze']['is_async'] = self.coze_is_async_var.get()
save_config(CONFIG)
messagebox.showinfo("保存成功", "Coze全局配置已保存")
return
current_type = self.coze_generation_type_var.get()
index = self.selected_template_index
if current_type not in self.templates or index >= len(self.templates[current_type]):
messagebox.showerror("错误", "无效的模板选择")
return
# 更新模板配置
template = self.templates[current_type][index]
template['name'] = self.template_name_var.get()
template['workflow_id'] = self.coze_workflow_id_var.get()
template['access_token'] = self.coze_access_token_var.get()
template['is_async'] = self.coze_is_async_var.get()
# 保存模板到配置文件
self.save_templates()
# 同时更新全局Coze配置如果需要的话
CONFIG['Coze']['workflow_id'] = self.coze_workflow_id_var.get()
CONFIG['Coze']['access_token'] = self.coze_access_token_var.get()
CONFIG['Coze']['is_async'] = self.coze_is_async_var.get()
save_config(CONFIG)
self.edit_status_label.configure(text="已保存", text_color="green")
self.after(2000, lambda: self.edit_status_label.configure(text=""))
messagebox.showinfo("保存成功", f"模板 '{template['name']}' 配置已保存")
except Exception as e:
messagebox.showerror("保存失败", f"保存Coze配置时出错{e}")
def save_baidu_config(self):
# 保存百度API配置
try:
CONFIG['Baidu']['api_key'] = self.baidu_api_key_var.get()
CONFIG['Baidu']['secret_key'] = self.baidu_secret_key_var.get()
CONFIG['Baidu']['enable_detection'] = self.baidu_enable_detection_var.get()
save_config(CONFIG)
messagebox.showinfo("保存成功", "百度API配置已保存")
except Exception as e:
messagebox.showerror("保存失败", f"保存百度API配置时出错{e}")
def save_image_config(self):
# 保存图片处理配置
try:
CONFIG['ImageModify']['crop_percent'] = self.crop_percent_var.get()
CONFIG['ImageModify']['min_rotation'] = self.min_rotation_var.get()
CONFIG['ImageModify']['max_rotation'] = self.max_rotation_var.get()
CONFIG['ImageModify']['min_brightness'] = self.min_brightness_var.get()
CONFIG['ImageModify']['max_brightness'] = self.max_brightness_var.get()
CONFIG['ImageModify']['watermark_text'] = self.watermark_text_var.get()
CONFIG['ImageModify']['watermark_opacity'] = self.watermark_opacity_var.get()
CONFIG['ImageModify']['overlay_opacity'] = self.overlay_opacity_var.get()
save_config(CONFIG)
messagebox.showinfo("保存成功", "图片处理配置已保存")
except Exception as e:
messagebox.showerror("保存失败", f"保存图片处理配置时出错:{e}")
def save_banned_words(self):
# 处理文本,将换行符替换为逗号
words = self.banned_words_text.get("1.0", "end-1c").strip().replace('\n', ',')
CONFIG['Keywords']['banned_words'] = words
save_config(CONFIG)
messagebox.showinfo("保存成功", "违禁词列表已更新")
# 同步到主页面
self.generation_type_var.set(self.coze_generation_type_var.get())
self.update_template_list()
def on_generation_type_changed(self, choice=None):
"""主页面生成类型改变时的处理"""
# 同步到coze页面
self.coze_generation_type_var.set(self.generation_type_var.get())
self.update_template_list()
def update_template_list(self):
"""更新模板列表显示"""
current_type = self.coze_generation_type_var.get()
# 清空现有按钮
for btn in self.template_buttons:
btn.destroy()
self.template_buttons.clear()
# 创建新的模板按钮
if current_type in self.templates:
for i, template in enumerate(self.templates[current_type]):
btn = ctk.CTkButton(
self.template_list_frame,
text=template['name'],
command=lambda idx=i: self.on_template_button_click(idx),
fg_color="transparent",
border_width=2,
anchor="w"
)
btn.pack(fill=ctk.X, padx=5, pady=2)
self.template_buttons.append(btn)
def on_template_button_click(self, index):
"""模板按钮点击事件"""
# 重置所有按钮颜色
for btn in self.template_buttons:
btn.configure(fg_color="transparent")
# 高亮选中的按钮
if index < len(self.template_buttons):
self.template_buttons[index].configure(fg_color=("gray75", "gray25"))
# 更新选中索引
self.selected_template_index = index
# 加载模板配置
current_type = self.coze_generation_type_var.get()
if current_type in self.templates and index < len(self.templates[current_type]):
template = self.templates[current_type][index]
self.load_template_config(template)
# 更新上次使用的模板信息
CONFIG['Coze']['last_used_template'] = template['name']
CONFIG['Coze']['last_used_template_type'] = current_type
save_config(CONFIG)
def load_template_config(self, template):
"""加载模板配置到界面"""
# 解绑之前的变量跟踪
self._unbind_var_trace()
self.template_name_var.set(template['name'])
self.coze_workflow_id_var.set(template.get('workflow_id', ''))
self.coze_access_token_var.set(template.get('access_token', ''))
self.coze_is_async_var.set(template.get('is_async', 'true'))
self.edit_status_label.configure(text="已加载", text_color="blue")
self.after(2000, lambda: self.edit_status_label.configure(text=""))
# 重新绑定变量跟踪
self._setup_var_trace()
def _setup_var_trace(self):
"""设置变量跟踪以显示编辑状态"""
self.var_traces = []
for var in [self.template_name_var, self.coze_workflow_id_var,
self.coze_access_token_var, self.coze_is_async_var]:
trace_id = var.trace_add('write', lambda *args: self._show_edit_status())
self.var_traces.append((var, trace_id))
def _unbind_var_trace(self):
"""解绑变量跟踪"""
if hasattr(self, 'var_traces'):
for var, trace_id in self.var_traces:
try:
var.trace_remove('write', trace_id)
except Exception:
pass
self.var_traces = []
def _show_edit_status(self):
"""显示编辑状态"""
self.edit_status_label.configure(text="未保存", text_color="red")
def add_template(self):
"""添加新模板"""
current_type = self.coze_generation_type_var.get()
if current_type not in self.templates:
self.templates[current_type] = []
# 弹出对话框让用户输入模板名称
new_template_name = simpledialog.askstring("新增模板", "请输入新模板的名称:")
if new_template_name:
new_template_name = new_template_name.strip()
if not new_template_name:
messagebox.showwarning("输入无效", "模板名称不能为空。")
return
# 检查模板名称是否重复
if any(t['name'] == new_template_name for t in self.templates[current_type]):
messagebox.showwarning("名称重复", f"模板名称 '{new_template_name}' 已存在,请使用其他名称。")
return
new_template = {
'name': new_template_name,
'workflow_id': '',
'access_token': '',
'is_async': 'true',
}
self.templates[current_type].append(new_template)
self.update_template_list()
self.save_templates()
# 选中新添加的模板
new_index = len(self.templates[current_type]) - 1
self.on_template_button_click(new_index)
# 延迟设置状态确保覆盖on_template_button_click中设置的状态
self.after(100, lambda: self.edit_status_label.configure(text="已添加", text_color="green"))
self.after(2100, lambda: self.edit_status_label.configure(text=""))
else:
messagebox.showinfo("取消操作", "已取消新增模板。")
def delete_template(self):
"""删除选中的模板"""
if self.selected_template_index is None:
messagebox.showwarning("提示", "请先选择要删除的模板")
return
index = self.selected_template_index
current_type = self.coze_generation_type_var.get()
if current_type not in self.templates or index >= len(self.templates[current_type]):
return
template_name = self.templates[current_type][index]['name']
if messagebox.askyesno("确认删除", f"确定要删除模板 '{template_name}' 吗?"):
del self.templates[current_type][index]
self.update_template_list()
self.save_templates()
# 清除配置并更新状态
self.clear_template_config()
self.edit_status_label.configure(text=f"已删除 '{template_name}'", text_color="red")
self.after(2000, lambda: self.edit_status_label.configure(text=""))
# 重置选中索引
self.selected_template_index = None
# 如果还有模板,选中最后一个
if self.templates[current_type]:
last_index = len(self.templates[current_type]) - 1
self.on_template_button_click(last_index)
def validate_template(self):
"""验证模板配置"""
name = self.template_name_var.get().strip()
workflow_id = self.coze_workflow_id_var.get().strip()
access_token = self.coze_access_token_var.get().strip()
if not name:
messagebox.showerror("错误", "模板名称不能为空")
return False
if not workflow_id:
messagebox.showerror("错误", "Workflow ID不能为空")
return False
if not access_token:
messagebox.showerror("错误", "Access Token不能为空")
return False
return True
def save_template(self):
"""保存当前模板配置"""
if not self.validate_template():
return
if self.selected_template_index is not None:
index = self.selected_template_index
current_type = self.coze_generation_type_var.get()
if current_type in self.templates and index < len(self.templates[current_type]):
template = self.templates[current_type][index]
new_name = self.template_name_var.get().strip()
if not new_name:
messagebox.showwarning("输入无效", "模板名称不能为空。")
return
# 检查新名称是否重复,排除当前模板自身
if new_name != template['name'] and any(t['name'] == new_name for t in self.templates[current_type]):
messagebox.showwarning("名称重复", f"模板名称 '{new_name}' 已存在,请使用其他名称。")
return
template['name'] = new_name
template['workflow_id'] = self.coze_workflow_id_var.get().strip()
template['access_token'] = self.coze_access_token_var.get().strip()
template['is_async'] = self.coze_is_async_var.get()
# 更新上次使用的模板信息
CONFIG['Coze']['last_used_template'] = template['name']
CONFIG['Coze']['last_used_template_type'] = current_type
self.update_template_list()
self.save_templates()
save_config(CONFIG)
self.edit_status_label.configure(text="已保存", text_color="green")
self.after(2000, lambda: self.edit_status_label.configure(text=""))
else:
messagebox.showwarning("未选择模板", "请先选择要保存的模板")
def rename_template(self):
"""重命名当前选中的模板"""
if self.selected_template_index is None:
messagebox.showwarning("未选择模板", "请先选择要重命名的模板")
return
index = self.selected_template_index
current_type = self.coze_generation_type_var.get()
if current_type not in self.templates or index >= len(self.templates[current_type]):
return
template = self.templates[current_type][index]
old_name = template['name']
# 弹出重命名对话框
new_name = simpledialog.askstring("重命名模板", "请输入新的模板名称:", initialvalue=old_name)
if new_name:
new_name = new_name.strip()
if not new_name:
messagebox.showwarning("输入无效", "模板名称不能为空。")
return
if new_name == old_name:
messagebox.showinfo("未修改", "新名称与旧名称相同,无需重命名。")
return
# 检查新名称是否重复
if any(t['name'] == new_name for t in self.templates[current_type] if t != template):
messagebox.showwarning("名称重复", f"模板名称 '{new_name}' 已存在,请使用其他名称。")
return
template['name'] = new_name
self.update_template_list()
self.save_templates()
# 重新选中重命名后的模板
self.on_template_button_click(index)
self.edit_status_label.configure(text="已重命名", text_color="green")
self.after(2000, lambda: self.edit_status_label.configure(text=""))
else:
messagebox.showinfo("取消操作", "已取消重命名模板。")
def duplicate_template(self):
"""复制当前选中的模板"""
if self.selected_template_index is None:
messagebox.showwarning("提示", "请先选择要复制的模板")
return
index = self.selected_template_index
current_type = self.coze_generation_type_var.get()
if current_type not in self.templates or index >= len(self.templates[current_type]):
return
template = self.templates[current_type][index]
new_template = template.copy()
# 生成新的模板名称,确保唯一性
base_name = template['name']
copy_num = 1
new_name = f"{base_name}_副本"
while any(t['name'] == new_name for t in self.templates[current_type]):
copy_num += 1
new_name = f"{base_name}_副本{copy_num}"
new_template['name'] = new_name
self.templates[current_type].append(new_template)
self.update_template_list()
self.save_templates()
# 选中新复制的模板
new_index = len(self.templates[current_type]) - 1
self.on_template_button_click(new_index)
# 延迟设置状态确保覆盖on_template_button_click中设置的状态
self.after(100, lambda: self.edit_status_label.configure(text="已复制", text_color="green"))
self.after(2100, lambda: self.edit_status_label.configure(text=""))
def use_template(self):
"""使用模板功能 - 弹出模板选择对话框并应用所选模板配置"""
# 创建模板选择对话框
dialog = ctk.CTkToplevel(self)
dialog.title("选择模板")
dialog.geometry("400x400")
dialog.transient(self)
dialog.grab_set()
dialog.resizable(False, False)
# 创建说明标签
ctk.CTkLabel(dialog, text="请选择要使用的模板:", font=ctk.CTkFont(size=12)).pack(pady=10)
# 创建模板类型选择框架
type_frame = ctk.CTkFrame(dialog, fg_color="transparent")
type_frame.pack(fill=ctk.X, padx=10, pady=5)
ctk.CTkLabel(type_frame, text="模板类型:").pack(side=ctk.LEFT, padx=5)
dialog_type_var = ctk.StringVar(value=self.coze_generation_type_var.get())
type_combo = ctk.CTkComboBox(type_frame, variable=dialog_type_var, values=["短篇", "文章"], width=120,
state="readonly")
type_combo.pack(side=ctk.LEFT, padx=5)
# 创建模板列表框架 - 使用CTkScrollableFrame
list_frame = ctk.CTkScrollableFrame(dialog, width=350, height=200)
list_frame.pack(fill=ctk.BOTH, expand=True, padx=10, pady=10)
# 存储模板按钮
dialog_template_buttons = []
dialog_selected_index = [None] # 使用列表存储以便在嵌套函数中修改
# 填充模板列表
def update_dialog_template_list():
# 清空现有按钮
for btn in dialog_template_buttons:
btn.destroy()
dialog_template_buttons.clear()
dialog_selected_index[0] = None
current_type = dialog_type_var.get()
if current_type in self.templates:
for i, template in enumerate(self.templates[current_type]):
btn = ctk.CTkButton(
list_frame,
text=template['name'],
command=lambda idx=i: select_dialog_template(idx),
fg_color="transparent",
border_width=2,
anchor="w"
)
btn.pack(fill=ctk.X, padx=5, pady=2)
dialog_template_buttons.append(btn)
def select_dialog_template(index):
"""对话框中的模板选择"""
# 重置所有按钮颜色
for btn in dialog_template_buttons:
btn.configure(fg_color="transparent")
# 高亮选中的按钮
if index < len(dialog_template_buttons):
dialog_template_buttons[index].configure(fg_color=("gray75", "gray25"))
dialog_selected_index[0] = index
update_dialog_template_list()
# 绑定类型选择变更事件
type_combo.configure(command=lambda choice: update_dialog_template_list())
# 创建按钮框架
button_frame = ctk.CTkFrame(dialog, fg_color="transparent")
button_frame.pack(fill=ctk.X, padx=10, pady=10)
# 定义确定按钮功能
def on_confirm():
if dialog_selected_index[0] is None:
messagebox.showwarning("未选择模板", "请先选择要使用的模板", parent=dialog)
return
index = dialog_selected_index[0]
current_type = dialog_type_var.get()
if current_type not in self.templates or index >= len(self.templates[current_type]):
return
selected_template = self.templates[current_type][index]
# 应用所选模板的配置
self.coze_generation_type_var.set(current_type)
self.generation_type_var.set(current_type)
# 更新工作流配置
self.coze_workflow_id_var.set(selected_template.get('workflow_id', ''))
self.coze_access_token_var.set(selected_template.get('access_token', ''))
self.coze_is_async_var.set(selected_template.get('is_async', 'true'))
# 更新CONFIG配置
CONFIG['Coze']['workflow_id'] = selected_template.get('workflow_id', '')
CONFIG['Coze']['access_token'] = selected_template.get('access_token', '')
CONFIG['Coze']['is_async'] = selected_template.get('is_async', 'true')
# 保存上次使用的模板信息
CONFIG['Coze']['last_used_template'] = selected_template['name']
CONFIG['Coze']['last_used_template_type'] = current_type
# 保存配置
save_config(CONFIG)
# 更新模板列表和选中状态
self.update_template_list()
# 在主界面中选中该模板
for i, template in enumerate(self.templates[current_type]):
if template['name'] == selected_template['name']:
self.on_template_button_click(i)
break
# 延迟设置状态
self.after(100, lambda: self.edit_status_label.configure(text=f"已应用模板 '{selected_template['name']}'",
text_color="green"))
self.after(2100, lambda: self.edit_status_label.configure(text=""))
# 关闭对话框
dialog.destroy()
# 添加确定和取消按钮
ctk.CTkButton(button_frame, text="确定", command=on_confirm).pack(side=ctk.RIGHT, padx=5)
ctk.CTkButton(button_frame, text="取消", command=dialog.destroy).pack(side=ctk.RIGHT, padx=5)
# 等待对话框关闭
self.wait_window(dialog)
def clear_template_config(self):
"""清空模板配置界面"""
# 解绑变量跟踪
self._unbind_var_trace()
# 清空所有配置字段
self.template_name_var.set('')
self.coze_workflow_id_var.set('')
self.coze_access_token_var.set('')
self.coze_is_async_var.set('true')
# 清空状态提示
self.edit_status_label.configure(text="已清空", text_color="gray")
self.after(2000, lambda: self.edit_status_label.configure(text=""))
# 重新绑定变量跟踪
self._setup_var_trace()
def load_templates(self):
"""从配置文件加载模板"""
try:
import json
# 检查Templates节是否存在
if 'Templates' in CONFIG:
templates_section = CONFIG['Templates']
for key in templates_section:
if key.startswith('templates_'):
template_type = key.replace('templates_', '')
if template_type in self.templates:
value = templates_section[key]
# 确保value是字符串类型
if isinstance(value, str):
try:
self.templates[template_type] = json.loads(value)
except json.JSONDecodeError as e:
logger.warning(f"解析模板配置{key}失败: {e}")
self.templates[template_type] = []
else:
logger.warning(f"模板配置{key}的值不是字符串类型: {type(value)}")
self.templates[template_type] = []
# 确保每个类型都有列表
for template_type in ["短篇", "文章"]:
if template_type not in self.templates:
self.templates[template_type] = []
except Exception as e:
logger.error(f"加载模板配置失败: {e}")
self.templates = {"短篇": [], "文章": []}
def save_templates(self):
"""保存模板到配置文件"""
try:
import json
# 确保Templates节存在
if 'Templates' not in CONFIG:
CONFIG.add_section('Templates')
for template_type, templates in self.templates.items():
CONFIG['Templates'][f'templates_{template_type}'] = json.dumps(templates, ensure_ascii=False)
save_config(CONFIG)
except Exception as e:
logger.error(f"保存模板配置失败: {e}")
messagebox.showerror("保存失败", f"保存模板配置时出错:{e}")
def load_last_used_template(self):
"""加载上次使用的模板"""
try:
# 检查是否有上次使用的模板信息
last_template = CONFIG['Coze'].get('last_used_template', '')
last_template_type = CONFIG['Coze'].get('last_used_template_type', '文章')
if last_template and last_template_type in self.templates:
# 设置模板类型
self.coze_generation_type_var.set(last_template_type)
self.generation_type_var.set(last_template_type)
# 更新模板列表
self.update_template_list()
# 查找并选中上次使用的模板
for i, template in enumerate(self.templates[last_template_type]):
if template['name'] == last_template:
self.on_template_button_click(i)
# 显示状态信息
self.edit_status_label.configure(text=f"已加载上次使用的模板 '{last_template}'")
self.after(3000, lambda: self.edit_status_label.configure(text=""))
break
except Exception as e:
logger.error(f"加载上次使用的模板失败: {e}")
def get_current_template(self):
"""获取当前选中的模板配置"""
if self.selected_template_index is not None:
current_type = self.coze_generation_type_var.get()
if current_type in self.templates and self.selected_template_index < len(self.templates[current_type]):
return self.templates[current_type][self.selected_template_index]
# 如果没有选中模板,返回当前界面的配置
return {
'name': self.template_name_var.get() or '默认模板',
'type': self.coze_generation_type_var.get(),
'workflow_id': self.coze_workflow_id_var.get(),
'access_token': self.coze_access_token_var.get(),
'is_async': self.coze_is_async_var.get(),
}
def browse_directory(self, var):
directory = filedialog.askdirectory()
if directory:
var.set(directory)
def browse_file(self, var, filetypes):
file_path = filedialog.askopenfilename(filetypes=filetypes)
if file_path:
var.set(file_path)
def browse_excel(self):
file_path = filedialog.askopenfilename(filetypes=[("Excel文件", "*.xlsx"), ("所有文件", "*.*")])
if file_path:
self.excel_path_var.set(file_path)
def test_db_connection(self):
try:
host = self.db_host_var.get()
user = self.db_user_var.get()
password = self.db_password_var.get()
database = self.db_name_var.get()
connection = pymysql.connect(
host=host,
user=user,
password=password,
database=database
)
connection.close()
messagebox.showinfo("连接成功", "数据库连接测试成功!")
except Exception as e:
messagebox.showerror("连接失败", f"数据库连接测试失败:{e}")
def preview_image_effect(self):
try:
# 创建一个示例图片
img = Image.new('RGB', (400, 300), color=(240, 240, 240))
draw = ImageDraw.Draw(img)
draw.rectangle([50, 50, 350, 250], fill=(200, 200, 200))
draw.text((150, 140), "示例图片", fill=(0, 0, 0))
# 应用图片修改效果
modified_img = self.apply_image_modifications(img)
# 显示修改后的图片
modified_img.show()
except Exception as e:
messagebox.showerror("预览失败", f"生成预览图片时出错:{e}")
def apply_image_modifications(self, img):
"""应用当前配置的图片修改效果"""
width, height = img.size
try:
# 从界面获取参数
crop_percent = float(self.crop_percent_var.get())
min_rotation = float(self.min_rotation_var.get())
max_rotation = float(self.max_rotation_var.get())
min_brightness = float(self.min_brightness_var.get())
max_brightness = float(self.max_brightness_var.get())
# 裁剪
crop_size = (int(width * crop_percent), int(height * crop_percent))
left = (width - crop_size[0]) // 2
top = (height - crop_size[1]) // 2
right = left + crop_size[0]
bottom = top + crop_size[1]
img = img.crop((left, top, right, bottom))
# 旋转
rotation_angle = random.uniform(min_rotation, max_rotation)
img = img.rotate(rotation_angle)
# 调整亮度
enhancer = ImageEnhance.Brightness(img)
brightness_factor = random.uniform(min_brightness, max_brightness)
img = enhancer.enhance(brightness_factor)
return img
except Exception as e:
messagebox.showerror("应用效果失败", f"应用图片修改效果时出错:{e}")
return img
def start_processing(self):
"""开始处理链接"""
if self.running:
messagebox.showinfo("处理中", "已有任务正在处理中,请等待完成")
return
try:
# 更新Excel文件路径
excel_path = self.excel_path_var.get()
if not os.path.exists(excel_path):
messagebox.showerror("文件错误", f"Excel文件不存在{excel_path}")
return
# 获取线程数
try:
num_threads = int(self.thread_count_var.get())
if num_threads < 1:
num_threads = 1
elif num_threads > MAX_THREADS:
num_threads = MAX_THREADS
except:
num_threads = 1
# 禁用开始按钮
self.start_button.configure(state="disabled")
self.running = True
# 清空日志
self.log_text.delete("1.0", "end")
# 获取AI服务提供商选择
ai_service = self.ai_service_var.get()
# 获取生成类型
generation_type = self.generation_type_var.get()
# 获取当前选中的模板配置
current_template = self.get_current_template()
# 在新线程中运行处理任务
self.thread = threading.Thread(target=self.run_processing,
args=(excel_path, num_threads, ai_service, generation_type,
current_template))
self.thread.daemon = True
self.thread.start()
# 启动进度更新
self.after(100, self.update_progress)
except Exception as e:
messagebox.showerror("启动失败", f"启动处理任务时出错:{e}")
self.start_button.configure(state="normal")
self.running = False
def run_processing(self, excel_path, num_threads, ai_service, generation_type=None, current_template=None):
"""在后台线程中运行处理任务"""
try:
# 更新全局变量
global TITLE_BASE_PATH
TITLE_BASE_PATH = excel_path
# 记录开始时间
start_time = time.time()
# 如果有模板配置临时更新CONFIG
original_config = None
if current_template and ai_service == 'coze':
original_config = {
'workflow_id': CONFIG['Coze']['workflow_id'],
'access_token': CONFIG['Coze']['access_token'],
'is_async': CONFIG['Coze']['is_async'],
}
CONFIG['Coze']['workflow_id'] = current_template.get('workflow_id', '')
CONFIG['Coze']['access_token'] = current_template.get('access_token', '')
CONFIG['Coze']['is_async'] = current_template.get('is_async', 'true')
logger.info(f"应用模板配置: {current_template.get('name')}")
logger.info(f"Workflow ID: {CONFIG['Coze']['workflow_id']}")
logger.info(f"Access Token: {'*' * len(CONFIG['Coze']['access_token'])}")
logger.info(f"Is Async: {CONFIG['Coze']['is_async']}")
# 读取链接并处理
logger.info(f"开始处理链接,使用 {num_threads} 个线程,生成类型: {generation_type}")
if current_template:
logger.info(f"使用模板: {current_template.get('name', '未命名')}")
results = link_to_text(num_threads=num_threads, ai_service=ai_service, current_template=current_template,
generation_type=generation_type)
# 计算处理结果
total_links = len(results)
success_links = 0
# 正确计算成功链接数
for result in results:
if isinstance(result, tuple) and len(result) >= 2:
_, success, _ = result
if success:
success_links += 1
else:
logger.warning(f"意外的结果格式: {result}")
# 记录结束时间和总耗时
end_time = time.time()
elapsed_time = end_time - start_time
# 记录处理结果
logger.info(
f"处理完成,共处理 {total_links} 个链接,成功 {success_links} 个,失败 {total_links - success_links}")
logger.info(f"总耗时: {elapsed_time:.2f}")
# 在主线程中显示处理结果
def show_result():
try:
messagebox.showinfo("处理完成",
f"共处理 {total_links} 个链接\n成功: {success_links}\n失败: {total_links - success_links}\n总耗时: {elapsed_time:.2f}")
except Exception as e:
logger.error(f"显示处理结果时出错: {e}")
messagebox.showerror("错误", f"显示处理结果时出错: {e}")
self.after(0, show_result)
except Exception as e:
logger.error(f"处理任务出错: {e}")
error_msg = str(e)
self.after(0, lambda: messagebox.showerror("处理错误", f"处理任务出错: {error_msg}"))
finally:
# 恢复原始配置(如果有的话)
if original_config is not None:
CONFIG['Coze']['workflow_id'] = original_config['workflow_id']
CONFIG['Coze']['access_token'] = original_config['access_token']
CONFIG['Coze']['is_async'] = original_config['is_async']
# 恢复开始按钮状态
self.after(0, lambda: self.start_button.configure(state="normal"))
self.running = False
def update_progress(self):
"""更新进度条和状态"""
if not self.running:
return
try:
# 获取当前进度
total = task_queue.qsize() + result_queue.qsize()
done = result_queue.qsize()
if total > 0:
# 更新进度条
progress = done / total
self.progress_bar.set(progress)
# 更新标题显示进度
self.title(f"文章采集与处理工具 - 进度: {progress * 100:.1f}%")
# 继续更新
self.after(500, self.update_progress)
except Exception as e:
logger.error(f"更新进度出错: {e}")
def on_close(self):
"""关闭窗口时的处理"""
if self.running:
if messagebox.askyesno("确认退出", "任务正在处理中,确定要退出吗?"):
self.destroy()
else:
self.destroy()
# 日志处理器类,用于将日志输出到文本框
class LogTextHandler(logging.Handler):
def __init__(self, text_widget):
logging.Handler.__init__(self)
self.text_widget = text_widget
def emit(self, record):
msg = self.format(record)
def append():
self.text_widget.insert("end", msg + '\n')
self.text_widget.see("end")
# 在主线程中更新UI
self.text_widget.after(0, append)
# 主函数
def main():
# 设置CustomTkinter外观模式
ctk.set_appearance_mode("System")
ctk.set_default_color_theme("blue")
validator = AuthValidator(
software_id="ArticleReplace",
api_url="http://km.taisan.online/api/v1",
gui_mode=True,
secret_key="taiyi1224"
)
# 执行验证
if not validator.validate():
print("授权验证失败,程序退出")
return
print("授权验证成功,启动程序...")
# 初始化日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("article_replace.log", encoding='utf-8'),
logging.StreamHandler()
]
)
# 创建必要的目录
if not os.path.exists(ARTICLES_BASE_PATH):
os.makedirs(ARTICLES_BASE_PATH)
if not os.path.exists(IMGS_BASE_PATH):
os.makedirs(IMGS_BASE_PATH)
# 启动GUI应用
app = ArticleReplaceApp()
app.mainloop()
if __name__ == "__main__":
main()