diff --git a/ArticleReplace.py b/ArticleReplace.py new file mode 100644 index 0000000..8356dda --- /dev/null +++ b/ArticleReplace.py @@ -0,0 +1,1579 @@ +import json +import sys # 导入sys模块 + +from PIL import Image, ImageDraw, ImageFont, ImageEnhance +import time +import random + +import threading +import tkinter as tk + +from config import * +from tkinter import ttk, messagebox, filedialog, simpledialog +from tkinter.scrolledtext import ScrolledText +import pandas as pd +import pymysql + +from main_process import link_to_text, task_queue, result_queue + +sys.setrecursionlimit(5000) + + +class ArticleReplaceApp(tk.Tk): + def __init__(self): + super().__init__() + + self.title("文章工作流调用工具(软件仅供交流使用)") + self.geometry("900x600") + + # 创建标签页控件 + self.notebook = ttk.Notebook(self) + self.notebook.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) + + # 创建主页面 + self.main_frame = ttk.Frame(self.notebook) + self.notebook.add(self.main_frame, text="主页面") + + # 创建配置页面 + self.config_frame = ttk.Frame(self.notebook) + self.notebook.add(self.config_frame, text="配置") + + # 创建免责声明页面 + self.disclaimer_frame = ttk.Frame(self.notebook) + self.notebook.add(self.disclaimer_frame, text="免责声明") + + # 初始化变量 + self.running = False + self.thread = None + self.total_links = 0 + self.processed_links = 0 + + # 初始化Coze配置变量 + self.template_name_var = tk.StringVar() + self.coze_workflow_id_var = tk.StringVar(value=CONFIG['Coze']['workflow_id']) + self.coze_access_token_var = tk.StringVar(value=CONFIG['Coze']['access_token']) + self.coze_is_async_var = tk.StringVar(value=CONFIG['Coze'].get('is_async', 'true')) + # self.coze_input_data_template_var = tk.StringVar(value=CONFIG['Coze'].get('input_data_template', '{"article": "{article_text}", "link":"{link}", "weijin":"{weijin}"}')) + + # 初始化模板数据结构 + self.templates = { + "短篇": [], + "文章": [] + } + + # 初始化主页面 + self.init_main_frame() + # 初始化配置页面 + self.init_config_frame() + # 初始化免责声明页面 + self.init_disclaimer_frame() + + # 设置关闭窗口事件 + self.protocol("WM_DELETE_WINDOW", self.on_close) + + def init_main_frame(self): + # 创建左侧控制面板 + control_frame = ttk.LabelFrame(self.main_frame, text="控制面板") + control_frame.pack(side=tk.LEFT, fill=tk.Y, padx=10, pady=10) + + # Excel文件选择 + ttk.Label(control_frame, text="Excel文件:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W) + self.excel_path_var = tk.StringVar(value=TITLE_BASE_PATH) + ttk.Entry(control_frame, textvariable=self.excel_path_var, width=30).grid(row=0, column=1, padx=5, pady=5) + ttk.Button(control_frame, text="浏览", command=self.browse_excel).grid(row=0, column=2, padx=5, pady=5) + + # 线程数设置 + ttk.Label(control_frame, text="线程数:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W) + self.thread_count_var = tk.StringVar(value="1") + ttk.Spinbox(control_frame, from_=1, to=MAX_THREADS, textvariable=self.thread_count_var, width=5).grid(row=1, + column=1, + padx=5, + pady=5, + sticky=tk.W) + + # AI服务提供商选择 + ttk.Label(control_frame, text="工作流选择:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W) + self.ai_service_var = tk.StringVar(value="coze") + ai_service_combo = ttk.Combobox(control_frame, textvariable=self.ai_service_var, values=["dify", "coze"], + width=10, state="readonly") + ai_service_combo.grid(row=2, column=1, padx=5, pady=5, sticky=tk.W) + + # 生成类型选择 + ttk.Label(control_frame, text="生成类型:").grid(row=3, column=0, padx=5, pady=5, sticky=tk.W) + self.generation_type_var = tk.StringVar(value="文章") + self.generation_type_combo = ttk.Combobox(control_frame, textvariable=self.generation_type_var, + values=["短篇", "文章"], width=10, state="readonly") + self.generation_type_combo.grid(row=3, column=1, padx=5, pady=5, sticky=tk.W) + self.generation_type_combo.bind("<>", self.on_generation_type_changed) + + # 开始按钮 + self.start_button = ttk.Button(control_frame, text="开始处理", command=self.start_processing) + self.start_button.grid(row=4, column=0, columnspan=3, padx=5, pady=20) + + # 进度条 + ttk.Label(control_frame, text="处理进度:").grid(row=5, column=0, padx=5, pady=5, sticky=tk.W) + self.progress_var = tk.DoubleVar() + ttk.Progressbar(control_frame, variable=self.progress_var, maximum=100).grid(row=5, column=1, columnspan=2, + padx=5, pady=5, sticky=tk.EW) + + # 创建右侧日志面板 + log_frame = ttk.LabelFrame(self.main_frame, text="日志") + log_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=10, pady=10) + + # 日志文本框 + self.log_text = ScrolledText(log_frame, width=70, height=30) + self.log_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) + self.log_text.config(state=tk.DISABLED) + + # 添加日志处理器 + self.log_handler = LogTextHandler(self.log_text) + self.log_handler.setLevel(logging.INFO) + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + self.log_handler.setFormatter(formatter) + logger.addHandler(self.log_handler) + + def init_config_frame(self): + # 创建配置标签页 + config_notebook = ttk.Notebook(self.config_frame) + config_notebook.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) + + # 创建各个配置页面 + general_frame = ttk.Frame(config_notebook) + database_frame = ttk.Frame(config_notebook) + dify_frame = ttk.Frame(config_notebook) + coze_frame = ttk.Frame(config_notebook) + baidu_frame = ttk.Frame(config_notebook) + image_frame = ttk.Frame(config_notebook) + keywords_frame = ttk.Frame(config_notebook) + + # 添加到标签页 + config_notebook.add(general_frame, text="常规设置") + config_notebook.add(database_frame, text="数据库设置") + config_notebook.add(dify_frame, text="Dify设置") + config_notebook.add(coze_frame, text="Coze设置") + config_notebook.add(baidu_frame, text="百度API设置") + config_notebook.add(image_frame, text="图片处理设置") + config_notebook.add(keywords_frame, text="违禁词设置") + + # 初始化各个配置页面 + self.init_general_config(general_frame) + self.init_database_config(database_frame) + self.init_dify_config(dify_frame) + self.init_coze_config(coze_frame) + self.init_baidu_config(baidu_frame) + self.init_image_config(image_frame) + self.init_keywords_config(keywords_frame) + + # 保存按钮 + save_button = ttk.Button(self.config_frame, text="保存所有配置", command=self.save_all_configs) + save_button.pack(side=tk.RIGHT, padx=10, pady=10) + + def init_general_config(self, parent): + # Chrome用户目录 + ttk.Label(parent, text="Chrome用户目录:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W) + self.chrome_dir_var = tk.StringVar(value=CONFIG['General']['chrome_user_dir']) + ttk.Entry(parent, textvariable=self.chrome_dir_var, width=50).grid(row=0, column=1, padx=5, pady=5) + ttk.Button(parent, text="浏览", command=lambda: self.browse_directory(self.chrome_dir_var)).grid(row=0, + column=2, + padx=5, pady=5) + + # 文章保存路径 + ttk.Label(parent, text="文章保存路径:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W) + self.articles_path_var = tk.StringVar(value=CONFIG['General']['articles_path']) + ttk.Entry(parent, textvariable=self.articles_path_var, width=50).grid(row=1, column=1, padx=5, pady=5) + ttk.Button(parent, text="浏览", command=lambda: self.browse_directory(self.articles_path_var)).grid(row=1, + column=2, + padx=5, + pady=5) + + # 图片保存路径 + ttk.Label(parent, text="图片保存路径:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W) + self.images_path_var = tk.StringVar(value=CONFIG['General']['images_path']) + self.grid = ttk.Entry(parent, textvariable=self.images_path_var, width=50).grid(row=2, column=1, padx=5, pady=5) + ttk.Button(parent, text="浏览", command=lambda: self.browse_directory(self.images_path_var)).grid(row=2, + column=2, + padx=5, + pady=5) + + # Excel文件路径 + ttk.Label(parent, text="默认Excel文件:").grid(row=3, column=0, padx=5, pady=5, sticky=tk.W) + self.excel_file_var = tk.StringVar(value=CONFIG['General']['title_file']) + ttk.Entry(parent, textvariable=self.excel_file_var, width=50).grid(row=3, column=1, padx=5, pady=5) + ttk.Button(parent, text="浏览", command=lambda: self.browse_file(self.excel_file_var, [("Excel文件", "*.xlsx"), + ("所有文件", + "*.*")])).grid(row=3, + column=2, + padx=5, + pady=5) + + # 最大线程数 + ttk.Label(parent, text="最大线程数:").grid(row=4, column=0, padx=5, pady=5, sticky=tk.W) + self.max_threads_var = tk.StringVar(value=CONFIG['General']['max_threads']) + ttk.Spinbox(parent, from_=1, to=10, textvariable=self.max_threads_var, width=5).grid(row=4, column=1, padx=5, + pady=5, sticky=tk.W) + + # 保存按钮 + ttk.Button(parent, text="保存配置", command=self.save_general_config).grid(row=5, column=1, padx=5, pady=10, + sticky=tk.E) + + def init_database_config(self, parent): + # 数据库主机 + ttk.Label(parent, text="数据库主机:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W) + self.db_host_var = tk.StringVar(value=CONFIG['Database']['host']) + ttk.Entry(parent, textvariable=self.db_host_var, width=30).grid(row=0, column=1, padx=5, pady=5) + + # 数据库用户名 + ttk.Label(parent, text="数据库用户名:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W) + self.db_user_var = tk.StringVar(value=CONFIG['Database']['user']) + ttk.Entry(parent, textvariable=self.db_user_var, width=30).grid(row=1, column=1, padx=5, pady=5) + + # 数据库密码 + ttk.Label(parent, text="数据库密码:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W) + self.db_password_var = tk.StringVar(value=CONFIG['Database']['password']) + ttk.Entry(parent, textvariable=self.db_password_var, width=30, show="*").grid(row=2, column=1, padx=5, pady=5) + + # 数据库名称 + ttk.Label(parent, text="数据库名称:").grid(row=3, column=0, padx=5, pady=5, sticky=tk.W) + self.db_name_var = tk.StringVar(value=CONFIG['Database']['database']) + ttk.Entry(parent, textvariable=self.db_name_var, width=30).grid(row=3, column=1, padx=5, pady=5) + + # 测试连接按钮 + ttk.Button(parent, text="测试连接", command=self.test_db_connection).grid(row=4, column=1, padx=5, pady=5, + sticky=tk.E) + + # 保存按钮 + ttk.Button(parent, text="保存配置", command=self.save_database_config).grid(row=5, column=1, padx=5, pady=10, + sticky=tk.E) + + def init_dify_config(self, parent): + # Dify API Key + ttk.Label(parent, text="API Key:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W) + self.dify_api_key_var = tk.StringVar(value=CONFIG['Dify']['api_key']) + ttk.Entry(parent, textvariable=self.dify_api_key_var, width=50).grid(row=0, column=1, padx=5, pady=5) + + # Dify User ID + ttk.Label(parent, text="User ID:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W) + self.dify_user_id_var = tk.StringVar(value=CONFIG['Dify']['user_id']) + ttk.Entry(parent, textvariable=self.dify_user_id_var, width=30).grid(row=1, column=1, padx=5, pady=5) + + # Dify URL + ttk.Label(parent, text="URL:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W) + self.dify_url_var = tk.StringVar(value=CONFIG['Dify']['url']) + ttk.Entry(parent, textvariable=self.dify_url_var, width=50).grid(row=2, column=1, padx=5, pady=5) + + # Dify Input Data Template + ttk.Label(parent, text="Input Data模板:").grid(row=3, column=0, padx=5, pady=5, sticky=tk.W) + self.dify_input_data_template_var = tk.StringVar( + value=CONFIG['Dify'].get('input_data_template', '{"old_article": "{article_text}"}')) # 添加默认值 + ttk.Entry(parent, textvariable=self.dify_input_data_template_var, width=50).grid(row=3, column=1, padx=5, + pady=5) + + # 保存按钮 + ttk.Button(parent, text="保存配置", command=self.save_dify_config).grid(row=4, column=1, padx=5, pady=10, + sticky=tk.E) + + def init_coze_config(self, parent): + # 生成类型选择(与主页面联动) + type_frame = ttk.Frame(parent) + type_frame.grid(row=0, column=0, columnspan=3, padx=5, pady=5, sticky=tk.EW) + ttk.Label(type_frame, text="生成类型:").pack(side=tk.LEFT, padx=5) + self.coze_generation_type_var = tk.StringVar(value="短篇") + self.coze_generation_type_combo = ttk.Combobox(type_frame, textvariable=self.coze_generation_type_var, + values=["短篇", "文章"], width=10, state="readonly") + self.coze_generation_type_combo.pack(side=tk.LEFT, padx=5) + self.coze_generation_type_combo.bind("<>", self.on_coze_generation_type_changed) + + # 编辑状态标签 + self.edit_status_label = ttk.Label(type_frame, text="", foreground="blue") + self.edit_status_label.pack(side=tk.LEFT, padx=20) + + # 加载已保存的模板 + self.load_templates() + + # 初始化变量跟踪 + self._setup_var_trace() + + # 模板管理框架 + template_frame = ttk.LabelFrame(parent, text="模板管理") + template_frame.grid(row=1, column=0, columnspan=3, padx=5, pady=10, sticky=tk.EW) + + # 模板列表和滚动条 + list_frame = ttk.Frame(template_frame) + list_frame.grid(row=1, column=0, padx=5, pady=5, sticky=tk.W) + ttk.Label(template_frame, text="模板列表:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W) + self.template_listbox = tk.Listbox(list_frame, height=5, width=30) + scrollbar = ttk.Scrollbar(list_frame, orient="vertical", command=self.template_listbox.yview) + self.template_listbox.configure(yscrollcommand=scrollbar.set) + self.template_listbox.pack(side=tk.LEFT, fill=tk.Y) + scrollbar.pack(side=tk.RIGHT, fill=tk.Y) + self.template_listbox.bind("<>", self.on_template_selected) + + # 模板操作按钮 + button_frame = ttk.Frame(template_frame) + button_frame.grid(row=1, column=1, padx=10, pady=5, sticky=tk.N) + ttk.Button(button_frame, text="新增模板", command=self.add_template).pack(pady=2) + ttk.Button(button_frame, text="删除模板", command=self.delete_template).pack(pady=2) + ttk.Button(button_frame, text="重命名模板", command=self.rename_template).pack(pady=2) + ttk.Button(button_frame, text="保存模板", command=self.save_template).pack(pady=2) + ttk.Button(button_frame, text="复制模板", command=self.duplicate_template).pack(pady=2) + ttk.Button(button_frame, text="使用模板", command=self.use_template).pack(pady=2) + + # 当前模板配置 + config_frame = ttk.LabelFrame(parent, text="当前模板配置") + config_frame.grid(row=2, column=0, columnspan=3, padx=5, pady=10, sticky=tk.EW) + + # 模板名称 + ttk.Label(config_frame, text="模板名称:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W) + ttk.Entry(config_frame, textvariable=self.template_name_var, width=30).grid(row=0, column=1, padx=5, pady=5) + + # Coze Workflow ID + ttk.Label(config_frame, text="Workflow ID:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W) + ttk.Entry(config_frame, textvariable=self.coze_workflow_id_var, width=50).grid(row=1, column=1, padx=5, pady=5) + + # Coze Access Token + ttk.Label(config_frame, text="Access Token:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W) + ttk.Entry(config_frame, textvariable=self.coze_access_token_var, width=50).grid(row=2, column=1, padx=5, pady=5) + + # Coze Is Async + ttk.Label(config_frame, text="Is Async:").grid(row=3, column=0, padx=5, pady=5, sticky=tk.W) + ttk.Combobox(config_frame, textvariable=self.coze_is_async_var, values=["true", "false"], width=10, + state="readonly").grid(row=3, column=1, padx=5, pady=5, sticky=tk.W) + + # # Coze Input Data Template + # ttk.Label(config_frame, text="Input Data模板:").grid(row=4, column=0, padx=5, pady=5, sticky=tk.W) + # # Variable already initialized in __init__ + # ttk.Entry(config_frame, textvariable=self.coze_input_data_template_var, width=50).grid(row=4, column=1, padx=5, pady=5) + + # 保存按钮 + ttk.Button(config_frame, text="保存配置", command=self.save_coze_config).grid(row=5, column=1, padx=5, pady=10, + sticky=tk.E) + + # 更新模板列表 + self.update_template_list() + + # 自动加载上次使用的模板 + self.load_last_used_template() + + def init_baidu_config(self, parent): + # 百度 API Key + ttk.Label(parent, text="API Key:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W) + self.baidu_api_key_var = tk.StringVar(value=CONFIG['Baidu']['api_key']) + ttk.Entry(parent, textvariable=self.baidu_api_key_var, width=50).grid(row=0, column=1, padx=5, pady=5) + + # 百度 Secret Key + ttk.Label(parent, text="Secret Key:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W) + self.baidu_secret_key_var = tk.StringVar(value=CONFIG['Baidu']['secret_key']) + ttk.Entry(parent, textvariable=self.baidu_secret_key_var, width=50).grid(row=1, column=1, padx=5, pady=5) + + # 是否启用违规检测 + ttk.Label(parent, text="启用违规检测:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W) + self.baidu_enable_detection_var = tk.StringVar(value=CONFIG['Baidu'].get('enable_detection', 'false')) + enable_detection_combo = ttk.Combobox(parent, textvariable=self.baidu_enable_detection_var, + values=["true", "false"], width=10, state="readonly") + enable_detection_combo.grid(row=2, column=1, padx=5, pady=5, sticky=tk.W) + + # 保存按钮 + ttk.Button(parent, text="保存配置", command=self.save_baidu_config).grid(row=3, column=1, padx=5, pady=10, + sticky=tk.E) + + def init_image_config(self, parent): + # 裁剪百分比 + ttk.Label(parent, text="裁剪百分比:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W) + self.crop_percent_var = tk.StringVar(value=CONFIG['ImageModify']['crop_percent']) + ttk.Entry(parent, textvariable=self.crop_percent_var, width=10).grid(row=0, column=1, padx=5, pady=5, + sticky=tk.W) + + # 最小旋转角度 + ttk.Label(parent, text="最小旋转角度:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W) + self.min_rotation_var = tk.StringVar(value=CONFIG['ImageModify']['min_rotation']) + ttk.Entry(parent, textvariable=self.min_rotation_var, width=10).grid(row=1, column=1, padx=5, pady=5, + sticky=tk.W) + + # 最大旋转角度 + ttk.Label(parent, text="最大旋转角度:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W) + self.max_rotation_var = tk.StringVar(value=CONFIG['ImageModify']['max_rotation']) + ttk.Entry(parent, textvariable=self.max_rotation_var, width=10).grid(row=2, column=1, padx=5, pady=5, + sticky=tk.W) + + # 最小亮度 + ttk.Label(parent, text="最小亮度:").grid(row=3, column=0, padx=5, pady=5, sticky=tk.W) + self.min_brightness_var = tk.StringVar(value=CONFIG['ImageModify']['min_brightness']) + ttk.Entry(parent, textvariable=self.min_brightness_var, width=10).grid(row=3, column=1, padx=5, pady=5, + sticky=tk.W) + + # 最大亮度 + ttk.Label(parent, text="最大亮度:").grid(row=4, column=0, padx=5, pady=5, sticky=tk.W) + self.max_brightness_var = tk.StringVar(value=CONFIG['ImageModify']['max_brightness']) + ttk.Entry(parent, textvariable=self.max_brightness_var, width=10).grid(row=4, column=1, padx=5, pady=5, + sticky=tk.W) + + # 水印文字 + ttk.Label(parent, text="水印文字:").grid(row=0, column=2, padx=5, pady=5, sticky=tk.W) + self.watermark_text_var = tk.StringVar(value=CONFIG['ImageModify']['watermark_text']) + ttk.Entry(parent, textvariable=self.watermark_text_var, width=30).grid(row=0, column=3, padx=5, pady=5) + + # 水印透明度 + ttk.Label(parent, text="水印透明度:").grid(row=1, column=2, padx=5, pady=5, sticky=tk.W) + self.watermark_opacity_var = tk.StringVar(value=CONFIG['ImageModify']['watermark_opacity']) + ttk.Entry(parent, textvariable=self.watermark_opacity_var, width=10).grid(row=1, column=3, padx=5, pady=5, + sticky=tk.W) + + # 蒙版透明度 + ttk.Label(parent, text="蒙版透明度:").grid(row=2, column=2, padx=5, pady=5, sticky=tk.W) + self.overlay_opacity_var = tk.StringVar(value=CONFIG['ImageModify']['overlay_opacity']) + ttk.Entry(parent, textvariable=self.overlay_opacity_var, width=10).grid(row=2, column=3, padx=5, pady=5, + sticky=tk.W) + + # 预览按钮 + ttk.Button(parent, text="预览效果", command=self.preview_image_effect).grid(row=4, column=3, padx=5, pady=5, + sticky=tk.E) + + # 保存按钮 + ttk.Button(parent, text="保存配置", command=self.save_image_config).grid(row=5, column=3, padx=5, pady=10, + sticky=tk.E) + + def init_keywords_config(self, parent): + # 违禁词列表 + ttk.Label(parent, text="违禁词列表:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W) + self.banned_words_text = ScrolledText(parent, width=60, height=15) + self.banned_words_text.grid(row=1, column=0, columnspan=2, padx=5, pady=5, sticky=tk.NSEW) + self.banned_words_text.insert(tk.END, CONFIG['Keywords']['banned_words'].replace(',', '\n')) + + # 保存按钮 + ttk.Button(parent, text="保存违禁词", command=self.save_banned_words).grid(row=2, column=1, padx=5, pady=5, + sticky=tk.E) + + # 配置行列权重 + parent.columnconfigure(0, weight=1) + parent.rowconfigure(1, weight=1) + + def init_disclaimer_frame(self): + # 创建免责声明内容框架 + disclaimer_content = ttk.Frame(self.disclaimer_frame) + disclaimer_content.pack(fill=tk.BOTH, expand=True, padx=20, pady=20) + + # 标题 + title_label = ttk.Label(disclaimer_content, text="免责声明", font=("Arial", 16, "bold")) + title_label.pack(pady=10) + + # 免责声明文本 + disclaimer_text = ScrolledText(disclaimer_content, width=80, height=20, wrap=tk.WORD) + disclaimer_text.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) + disclaimer_text.insert(tk.END, """ +软件使用免责声明 + +1. 合法使用声明 + 本软件仅供合法、正当用途使用。用户应当遵守中华人民共和国相关法律法规,不得将本软件用于任何违法犯罪活动。 + +2. 内容责任声明 + 用户通过本软件生成、处理或发布的所有内容,其版权归属、合法性及内容真实性由用户自行负责。本软件开发者不对用户使用本软件处理的内容承担任何法律责任。 + +3. 使用风险声明 + 用户应自行承担使用本软件的风险。本软件按"现状"提供,不提供任何明示或暗示的保证,包括但不限于适销性、特定用途适用性和非侵权性的保证。 + +4. 禁止用途 + 严禁将本软件用于以下活动: + - 违反国家法律法规的活动 + - 侵犯他人知识产权或其他合法权益的活动 + - 传播虚假、欺诈或误导性信息的活动 + - 从事任何可能危害国家安全、社会稳定的活动 + - 其他违背社会公德、商业道德的活动 + +5. 责任限制 + 在法律允许的最大范围内,对于因使用或无法使用本软件而导致的任何直接、间接、偶然、特殊、惩罚性或后果性损害,本软件开发者不承担任何责任。 + +6. 协议更新 + 本免责声明可能会不定期更新,更新后的内容将在软件中公布,不再另行通知。用户继续使用本软件即表示接受修改后的免责声明。 + +7. 最终解释 + 本免责声明的最终解释权归本软件开发者所有。 + """) + disclaimer_text.config(state=tk.DISABLED) # 设置为只读 + + # 确认按钮 + confirm_frame = ttk.Frame(disclaimer_content) + confirm_frame.pack(pady=10) + ttk.Button(confirm_frame, text="我已阅读并同意以上声明", command=lambda: self.notebook.select(0)).pack() + + def save_all_configs(self): + """保存所有配置到配置文件""" + try: + # 保存所有单独的配置 + self.save_general_config() + self.save_database_config() + self.save_dify_config() + self.save_coze_config() + self.save_baidu_config() + self.save_image_config() + self.save_banned_words() + + messagebox.showinfo("保存成功", "所有配置已保存") + except Exception as e: + messagebox.showerror("保存失败", f"保存配置时出错:{e}") + + # 保存按钮 + save_button = ttk.Button(self.config_frame, text="保存所有配置", command=self.save_all_configs) + save_button.pack(side=tk.RIGHT, padx=10, pady=10) + + def on_coze_generation_type_changed(self, event=None): + """coze页面生成类型改变时的处理""" + + def save_general_config(self): + # 保存常规配置 + try: + CONFIG['General']['chrome_user_dir'] = self.chrome_dir_var.get() + CONFIG['General']['articles_path'] = self.articles_path_var.get() + CONFIG['General']['images_path'] = self.images_path_var.get() + CONFIG['General']['title_file'] = self.excel_file_var.get() + CONFIG['General']['max_threads'] = self.max_threads_var.get() + + save_config(CONFIG) + + # 更新全局变量 + global USER_DIR_PATH, ARTICLES_BASE_PATH, IMGS_BASE_PATH, TITLE_BASE_PATH, MAX_THREADS + USER_DIR_PATH = CONFIG['General']['chrome_user_dir'] + ARTICLES_BASE_PATH = CONFIG['General']['articles_path'] + IMGS_BASE_PATH = CONFIG['General']['images_path'] + TITLE_BASE_PATH = CONFIG['General']['title_file'] + MAX_THREADS = int(CONFIG['General']['max_threads']) + + # 创建必要的目录 + if not os.path.exists(ARTICLES_BASE_PATH): + os.makedirs(ARTICLES_BASE_PATH) + if not os.path.exists(IMGS_BASE_PATH): + os.makedirs(IMGS_BASE_PATH) + + messagebox.showinfo("保存成功", "常规配置已保存") + except Exception as e: + messagebox.showerror("保存失败", f"保存常规配置时出错:{e}") + + def save_database_config(self): + # 保存数据库配置 + try: + CONFIG['Database']['host'] = self.db_host_var.get() + CONFIG['Database']['user'] = self.db_user_var.get() + CONFIG['Database']['password'] = self.db_password_var.get() + CONFIG['Database']['database'] = self.db_name_var.get() + + save_config(CONFIG) + messagebox.showinfo("保存成功", "数据库配置已保存") + except Exception as e: + messagebox.showerror("保存失败", f"保存数据库配置时出错:{e}") + + def save_dify_config(self): + # 保存Dify配置 + try: + CONFIG['Dify']['api_key'] = self.dify_api_key_var.get() + CONFIG['Dify']['user_id'] = self.dify_user_id_var.get() + CONFIG['Dify']['url'] = self.dify_url_var.get() + CONFIG['Dify']['input_data_template'] = self.dify_input_data_template_var.get() + + save_config(CONFIG) + messagebox.showinfo("保存成功", "Dify配置已保存") + except Exception as e: + messagebox.showerror("保存失败", f"保存Dify配置时出错:{e}") + + def save_coze_config(self): + # 保存当前Coze模板配置 + try: + # 获取当前选中的模板 + selection = self.template_listbox.curselection() + if not selection: + # 如果没有选中模板,只保存全局Coze配置 + CONFIG['Coze']['workflow_id'] = self.coze_workflow_id_var.get() + CONFIG['Coze']['access_token'] = self.coze_access_token_var.get() + CONFIG['Coze']['is_async'] = self.coze_is_async_var.get() + save_config(CONFIG) + messagebox.showinfo("保存成功", "Coze全局配置已保存") + return + + # 获取当前选中的模板索引 + index = selection[0] + current_type = self.coze_generation_type_var.get() + + if current_type not in self.templates or index >= len(self.templates[current_type]): + messagebox.showerror("错误", "无效的模板选择") + return + + # 更新模板配置 + template = self.templates[current_type][index] + template['name'] = self.template_name_var.get() + template['workflow_id'] = self.coze_workflow_id_var.get() + template['access_token'] = self.coze_access_token_var.get() + template['is_async'] = self.coze_is_async_var.get() + + # 保存模板到配置文件 + self.save_templates() + + # 同时更新全局Coze配置(如果需要的话) + CONFIG['Coze']['workflow_id'] = self.coze_workflow_id_var.get() + CONFIG['Coze']['access_token'] = self.coze_access_token_var.get() + CONFIG['Coze']['is_async'] = self.coze_is_async_var.get() + save_config(CONFIG) + + self.edit_status_label.config(text="已保存", foreground="green") + self.after(2000, lambda: self.edit_status_label.config(text="")) + messagebox.showinfo("保存成功", f"模板 '{template['name']}' 配置已保存") + except Exception as e: + messagebox.showerror("保存失败", f"保存Coze配置时出错:{e}") + + def save_baidu_config(self): + # 保存百度API配置 + try: + CONFIG['Baidu']['api_key'] = self.baidu_api_key_var.get() + CONFIG['Baidu']['secret_key'] = self.baidu_secret_key_var.get() + CONFIG['Baidu']['enable_detection'] = self.baidu_enable_detection_var.get() + + save_config(CONFIG) + messagebox.showinfo("保存成功", "百度API配置已保存") + except Exception as e: + messagebox.showerror("保存失败", f"保存百度API配置时出错:{e}") + + def save_image_config(self): + # 保存图片处理配置 + try: + CONFIG['ImageModify']['crop_percent'] = self.crop_percent_var.get() + CONFIG['ImageModify']['min_rotation'] = self.min_rotation_var.get() + CONFIG['ImageModify']['max_rotation'] = self.max_rotation_var.get() + CONFIG['ImageModify']['min_brightness'] = self.min_brightness_var.get() + CONFIG['ImageModify']['max_brightness'] = self.max_brightness_var.get() + CONFIG['ImageModify']['watermark_text'] = self.watermark_text_var.get() + CONFIG['ImageModify']['watermark_opacity'] = self.watermark_opacity_var.get() + CONFIG['ImageModify']['overlay_opacity'] = self.overlay_opacity_var.get() + + save_config(CONFIG) + messagebox.showinfo("保存成功", "图片处理配置已保存") + except Exception as e: + messagebox.showerror("保存失败", f"保存图片处理配置时出错:{e}") + + def save_banned_words(self): + # 处理文本,将换行符替换为逗号 + words = self.banned_words_text.get(1.0, tk.END).strip().replace('\n', ',') + CONFIG['Keywords']['banned_words'] = words + save_config(CONFIG) + messagebox.showinfo("保存成功", "违禁词列表已更新") + + # 同步到主页面 + self.generation_type_var.set(self.coze_generation_type_var.get()) + self.update_template_list() + + def on_generation_type_changed(self, event=None): + """主页面生成类型改变时的处理""" + # 同步到coze页面 + self.coze_generation_type_var.set(self.generation_type_var.get()) + self.update_template_list() + + def update_template_list(self): + """更新模板列表显示""" + current_type = self.coze_generation_type_var.get() + self.template_listbox.delete(0, tk.END) + + if current_type in self.templates: + for template in self.templates[current_type]: + self.template_listbox.insert(tk.END, template['name']) + + def on_template_selected(self, event=None): + """模板选择时的处理""" + selection = self.template_listbox.curselection() + if selection: + index = selection[0] + current_type = self.coze_generation_type_var.get() + if current_type in self.templates and index < len(self.templates[current_type]): + template = self.templates[current_type][index] + self.load_template_config(template) + + # 更新上次使用的模板信息 + CONFIG['Coze']['last_used_template'] = template['name'] + CONFIG['Coze']['last_used_template_type'] = current_type + save_config(CONFIG) # 保存配置文件 + + def load_template_config(self, template): + """加载模板配置到界面""" + # 解绑之前的变量跟踪 + self._unbind_var_trace() + + self.template_name_var.set(template['name']) + self.coze_workflow_id_var.set(template.get('workflow_id', '')) + self.coze_access_token_var.set(template.get('access_token', '')) + self.coze_is_async_var.set(template.get('is_async', 'true')) + # self.coze_input_data_template_var.set(template.get('input_data_template', '')) + + self.edit_status_label.config(text="已加载", foreground="blue") + self.after(2000, lambda: self.edit_status_label.config(text="")) + + # 重新绑定变量跟踪 + self._setup_var_trace() + + def _setup_var_trace(self): + """设置变量跟踪以显示编辑状态""" + self.var_traces = [] + for var in [self.template_name_var, self.coze_workflow_id_var, + self.coze_access_token_var, self.coze_is_async_var]: + # self.coze_input_data_template_var]: + trace_id = var.trace_add('write', lambda *args: self._show_edit_status()) + self.var_traces.append((var, trace_id)) + + def _unbind_var_trace(self): + """解绑变量跟踪""" + if hasattr(self, 'var_traces'): + for var, trace_id in self.var_traces: + try: + var.trace_remove('write', trace_id) + except Exception: + pass + self.var_traces = [] + + def _show_edit_status(self): + """显示编辑状态""" + self.edit_status_label.config(text="未保存", foreground="red") + + def add_template(self): + """添加新模板""" + current_type = self.coze_generation_type_var.get() + if current_type not in self.templates: + self.templates[current_type] = [] + + # 弹出对话框让用户输入模板名称 + new_template_name = simpledialog.askstring("新增模板", "请输入新模板的名称:") + if new_template_name: + new_template_name = new_template_name.strip() + if not new_template_name: + messagebox.showwarning("输入无效", "模板名称不能为空。") + return + + # 检查模板名称是否重复 + if any(t['name'] == new_template_name for t in self.templates[current_type]): + messagebox.showwarning("名称重复", f"模板名称 '{new_template_name}' 已存在,请使用其他名称。") + return + + new_template = { + 'name': new_template_name, + 'workflow_id': '', + 'access_token': '', + 'is_async': 'true', + # 'input_data_template': '{"article": "{article_text}", "link":"{link}", "weijin":"{weijin}"}' + } + self.templates[current_type].append(new_template) + self.update_template_list() + self.save_templates() + + # 选中新添加的模板 + new_index = len(self.templates[current_type]) - 1 + self.template_listbox.selection_clear(0, tk.END) + self.template_listbox.selection_set(new_index) + # selection_set会触发on_template_selected事件,自动加载模板配置 + + # 延迟设置状态,确保覆盖on_template_selected中设置的状态 + self.after(100, lambda: self.edit_status_label.config(text="已添加", foreground="green")) + self.after(2100, lambda: self.edit_status_label.config(text="")) + else: + messagebox.showinfo("取消操作", "已取消新增模板。") + + def delete_template(self): + """删除选中的模板""" + selection = self.template_listbox.curselection() + if not selection: + messagebox.showwarning("提示", "请先选择要删除的模板") + return + + index = selection[0] + current_type = self.coze_generation_type_var.get() + if current_type not in self.templates or index >= len(self.templates[current_type]): + return + + template_name = self.templates[current_type][index]['name'] + if messagebox.askyesno("确认删除", f"确定要删除模板 '{template_name}' 吗?"): + del self.templates[current_type][index] + self.update_template_list() + self.save_templates() + + # 清除配置并更新状态 + self.clear_template_config() + self.edit_status_label.config(text=f"已删除 '{template_name}'", foreground="red") + self.after(2000, lambda: self.edit_status_label.config(text="")) + + # 如果还有模板,选中最后一个 + if self.templates[current_type]: + last_index = len(self.templates[current_type]) - 1 + self.template_listbox.selection_set(last_index) + self.load_template_config(self.templates[current_type][last_index]) + + def validate_template(self): + """验证模板配置""" + name = self.template_name_var.get().strip() + workflow_id = self.coze_workflow_id_var.get().strip() + access_token = self.coze_access_token_var.get().strip() + # input_template = self.coze_input_data_template_var.get().strip() + + if not name: + messagebox.showerror("错误", "模板名称不能为空") + return False + + if not workflow_id: + messagebox.showerror("错误", "Workflow ID不能为空") + return False + + if not access_token: + messagebox.showerror("错误", "Access Token不能为空") + return False + + # if not input_template: + # messagebox.showerror("错误", "输入数据模板不能为空") + # return False + # + # # 验证输入数据模板的JSON格式 + # try: + # # 替换占位符以便验证JSON格式 + # test_template = input_template.replace('{article_text}', '""')\ + # .replace('{link}', '""')\ + # .replace('{weijin}', '""')\ + # .replace('{title_text}', '""') + # json.loads(test_template) + # except json.JSONDecodeError as e: + # messagebox.showerror("错误", f"输入数据模板不是有效的JSON格式:\n{str(e)}") + # return False + + return True + + def save_template(self): + """保存当前模板配置""" + if not self.validate_template(): + return + + selection = self.template_listbox.curselection() + if selection: + index = selection[0] + current_type = self.coze_generation_type_var.get() + if current_type in self.templates and index < len(self.templates[current_type]): + template = self.templates[current_type][index] + new_name = self.template_name_var.get().strip() + if not new_name: + messagebox.showwarning("输入无效", "模板名称不能为空。") + return + + # 检查新名称是否重复,排除当前模板自身 + if new_name != template['name'] and any(t['name'] == new_name for t in self.templates[current_type]): + messagebox.showwarning("名称重复", f"模板名称 '{new_name}' 已存在,请使用其他名称。") + return + + template['name'] = new_name + template['workflow_id'] = self.coze_workflow_id_var.get().strip() + template['access_token'] = self.coze_access_token_var.get().strip() + template['is_async'] = self.coze_is_async_var.get() + # template['input_data_template'] = self.coze_input_data_template_var.get().strip() + + # 更新上次使用的模板信息 + CONFIG['Coze']['last_used_template'] = template['name'] + CONFIG['Coze']['last_used_template_type'] = current_type + + self.update_template_list() + self.save_templates() + save_config(CONFIG) # 保存配置文件 + + self.edit_status_label.config(text="已保存", foreground="green") + self.after(2000, lambda: self.edit_status_label.config(text="")) + else: + messagebox.showwarning("未选择模板", "请先选择要保存的模板") + + def rename_template(self): + """重命名当前选中的模板""" + selection = self.template_listbox.curselection() + if not selection: + messagebox.showwarning("未选择模板", "请先选择要重命名的模板") + return + + index = selection[0] + current_type = self.coze_generation_type_var.get() + if current_type not in self.templates or index >= len(self.templates[current_type]): + return + + template = self.templates[current_type][index] + old_name = template['name'] + + # 弹出重命名对话框 + new_name = simpledialog.askstring("重命名模板", "请输入新的模板名称:", initialvalue=old_name) + if new_name: + new_name = new_name.strip() + if not new_name: + messagebox.showwarning("输入无效", "模板名称不能为空。") + return + if new_name == old_name: + messagebox.showinfo("未修改", "新名称与旧名称相同,无需重命名。") + return + # 检查新名称是否重复 + if any(t['name'] == new_name for t in self.templates[current_type] if t != template): + messagebox.showwarning("名称重复", f"模板名称 '{new_name}' 已存在,请使用其他名称。") + return + + template['name'] = new_name + self.update_template_list() + self.save_templates() + # 重新选中重命名后的模板 + self.template_listbox.selection_set(index) + self.edit_status_label.config(text="已重命名", foreground="green") + self.after(2000, lambda: self.edit_status_label.config(text="")) + else: + messagebox.showinfo("取消操作", "已取消重命名模板。") + + def duplicate_template(self): + """复制当前选中的模板""" + selection = self.template_listbox.curselection() + if not selection: + messagebox.showwarning("提示", "请先选择要复制的模板") + return + + index = selection[0] + current_type = self.coze_generation_type_var.get() + if current_type not in self.templates or index >= len(self.templates[current_type]): + return + + template = self.templates[current_type][index] + new_template = template.copy() + + # 生成新的模板名称,确保唯一性 + base_name = template['name'] + copy_num = 1 + new_name = f"{base_name}_副本" + while any(t['name'] == new_name for t in self.templates[current_type]): + copy_num += 1 + new_name = f"{base_name}_副本{copy_num}" + new_template['name'] = new_name + + self.templates[current_type].append(new_template) + self.update_template_list() + self.save_templates() + + # 选中新复制的模板 + new_index = len(self.templates[current_type]) - 1 + self.template_listbox.selection_clear(0, tk.END) + self.template_listbox.selection_set(new_index) + # selection_set会触发on_template_selected事件,自动加载模板配置 + + # 延迟设置状态,确保覆盖on_template_selected中设置的状态 + self.after(100, lambda: self.edit_status_label.config(text="已复制", foreground="green")) + self.after(2100, lambda: self.edit_status_label.config(text="")) + + def use_template(self): + """使用模板功能 - 弹出模板选择对话框并应用所选模板配置""" + # 创建模板选择对话框 + dialog = tk.Toplevel(self) + dialog.title("选择模板") + dialog.geometry("400x400") + dialog.transient(self) # 设置为应用程序的子窗口 + dialog.grab_set() # 模态对话框 + dialog.resizable(False, False) + + # 创建说明标签 + ttk.Label(dialog, text="请选择要使用的模板:", font=("Arial", 10)).pack(pady=10) + + # 创建模板类型选择框架 + type_frame = ttk.Frame(dialog) + type_frame.pack(fill=tk.X, padx=10, pady=5) + + ttk.Label(type_frame, text="模板类型:").pack(side=tk.LEFT, padx=5) + dialog_type_var = tk.StringVar(value=self.coze_generation_type_var.get()) + type_combo = ttk.Combobox(type_frame, textvariable=dialog_type_var, values=["短篇", "文章"], width=10, + state="readonly") + type_combo.pack(side=tk.LEFT, padx=5) + + # 创建模板列表框架 + list_frame = ttk.Frame(dialog) + list_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) + + # 创建模板列表和滚动条 + template_listbox = tk.Listbox(list_frame, height=10, width=40) + scrollbar = ttk.Scrollbar(list_frame, orient="vertical", command=template_listbox.yview) + template_listbox.configure(yscrollcommand=scrollbar.set) + template_listbox.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + scrollbar.pack(side=tk.RIGHT, fill=tk.Y) + + # 填充模板列表 + def update_dialog_template_list(): + template_listbox.delete(0, tk.END) + current_type = dialog_type_var.get() + if current_type in self.templates: + for template in self.templates[current_type]: + template_listbox.insert(tk.END, template['name']) + + update_dialog_template_list() + + # 绑定类型选择变更事件 + def on_dialog_type_changed(event=None): + update_dialog_template_list() + + type_combo.bind("<>", on_dialog_type_changed) + + # 创建按钮框架 + button_frame = ttk.Frame(dialog) + button_frame.pack(fill=tk.X, padx=10, pady=10) + + # 定义确定按钮功能 + def on_confirm(): + selection = template_listbox.curselection() + if not selection: + messagebox.showwarning("未选择模板", "请先选择要使用的模板", parent=dialog) + return + + index = selection[0] + current_type = dialog_type_var.get() + if current_type not in self.templates or index >= len(self.templates[current_type]): + return + + selected_template = self.templates[current_type][index] + + # 应用所选模板的配置 + self.coze_generation_type_var.set(current_type) # 更新生成类型 + self.generation_type_var.set(current_type) # 同步到主页面 + + # 更新工作流配置 + self.coze_workflow_id_var.set(selected_template.get('workflow_id', '')) + self.coze_access_token_var.set(selected_template.get('access_token', '')) + self.coze_is_async_var.set(selected_template.get('is_async', 'true')) + # self.coze_input_data_template_var.set(selected_template.get('input_data_template', '')) + + # 更新CONFIG配置 + CONFIG['Coze']['workflow_id'] = selected_template.get('workflow_id', '') + CONFIG['Coze']['access_token'] = selected_template.get('access_token', '') + CONFIG['Coze']['is_async'] = selected_template.get('is_async', 'true') + # CONFIG['Coze']['input_data_template'] = selected_template.get('input_data_template', '') + + # 保存上次使用的模板信息 + CONFIG['Coze']['last_used_template'] = selected_template['name'] + CONFIG['Coze']['last_used_template_type'] = current_type + + # 保存配置 + save_config(CONFIG) + + # 更新模板列表和选中状态 + self.update_template_list() + for i in range(self.template_listbox.size()): + if self.template_listbox.get(i) == selected_template['name']: + self.template_listbox.selection_set(i) + break + + # 加载模板配置到界面 + self.load_template_config(selected_template) + + # 延迟设置状态,确保覆盖load_template_config设置的状态 + self.after(100, lambda: self.edit_status_label.config(text=f"已应用模板 '{selected_template['name']}'", + foreground="green")) + self.after(2100, lambda: self.edit_status_label.config(text="")) + + # 关闭对话框 + dialog.destroy() + + # 添加确定和取消按钮 + ttk.Button(button_frame, text="确定", command=on_confirm).pack(side=tk.RIGHT, padx=5) + ttk.Button(button_frame, text="取消", command=dialog.destroy).pack(side=tk.RIGHT, padx=5) + + # 设置默认选中第一个模板 + if template_listbox.size() > 0: + template_listbox.selection_set(0) + + # 等待对话框关闭 + self.wait_window(dialog) + + def clear_template_config(self): + """清空模板配置界面""" + # 解绑变量跟踪 + self._unbind_var_trace() + + # 清空所有配置字段 + self.template_name_var.set('') + self.coze_workflow_id_var.set('') + self.coze_access_token_var.set('') + self.coze_is_async_var.set('true') + # self.coze_input_data_template_var.set('') + + # 清空状态提示 + self.edit_status_label.config(text="已清空", foreground="gray") + self.after(2000, lambda: self.edit_status_label.config(text="")) + + # 重新绑定变量跟踪 + self._setup_var_trace() + + def load_templates(self): + """从配置文件加载模板""" + try: + import json + # 检查Templates节是否存在 + if 'Templates' in CONFIG: + templates_section = CONFIG['Templates'] + for key in templates_section: + if key.startswith('templates_'): + template_type = key.replace('templates_', '') + if template_type in self.templates: + value = templates_section[key] + # 确保value是字符串类型 + if isinstance(value, str): + try: + self.templates[template_type] = json.loads(value) + except json.JSONDecodeError as e: + logger.warning(f"解析模板配置{key}失败: {e}") + self.templates[template_type] = [] + else: + logger.warning(f"模板配置{key}的值不是字符串类型: {type(value)}") + self.templates[template_type] = [] + # 确保每个类型都有列表 + for template_type in ["短篇", "文章"]: + if template_type not in self.templates: + self.templates[template_type] = [] + except Exception as e: + logger.error(f"加载模板配置失败: {e}") + # 确保模板字典已初始化 + self.templates = {"短篇": [], "文章": []} + + def save_templates(self): + """保存模板到配置文件""" + try: + import json + # 确保Templates节存在 + if 'Templates' not in CONFIG: + CONFIG.add_section('Templates') + + for template_type, templates in self.templates.items(): + CONFIG['Templates'][f'templates_{template_type}'] = json.dumps(templates, ensure_ascii=False) + + save_config(CONFIG) + except Exception as e: + logger.error(f"保存模板配置失败: {e}") + messagebox.showerror("保存失败", f"保存模板配置时出错:{e}") + + def load_last_used_template(self): + """加载上次使用的模板""" + try: + # 检查是否有上次使用的模板信息 + last_template = CONFIG['Coze'].get('last_used_template', '') + last_template_type = CONFIG['Coze'].get('last_used_template_type', '文章') + + if last_template and last_template_type in self.templates: + # 设置模板类型 + self.coze_generation_type_var.set(last_template_type) + self.generation_type_var.set(last_template_type) # 同步到主页面 + + # 更新模板列表 + self.update_template_list() + + # 查找并选中上次使用的模板 + found = False + for i, template in enumerate(self.templates[last_template_type]): + if template['name'] == last_template: + self.template_listbox.selection_clear(0, tk.END) + self.template_listbox.selection_set(i) + self.template_listbox.see(i) # 确保可见 + + # 加载模板配置 + self.load_template_config(template) + + # 显示状态信息 + self.edit_status_label.config(text=f"已加载上次使用的模板 '{last_template}'") + self.after(3000, lambda: self.edit_status_label.config(text="")) + + found = True + break + + if not found: + logger.warning(f"未找到上次使用的模板: {last_template}") + except Exception as e: + logger.error(f"加载上次使用的模板失败: {e}") + # 出错时不显示错误消息,静默失败 + + def get_current_template(self): + """获取当前选中的模板配置""" + selection = self.template_listbox.curselection() + if selection: + index = selection[0] + current_type = self.coze_generation_type_var.get() + if current_type in self.templates and index < len(self.templates[current_type]): + return self.templates[current_type][index] + + # 如果没有选中模板,返回当前界面的配置 + return { + 'name': self.template_name_var.get() or '默认模板', + 'type': self.coze_generation_type_var.get(), + 'workflow_id': self.coze_workflow_id_var.get(), + 'access_token': self.coze_access_token_var.get(), + 'is_async': self.coze_is_async_var.get(), + # 'input_data_template': self.coze_input_data_template_var.get() + } + + def browse_directory(self, var): + directory = filedialog.askdirectory() + if directory: + var.set(directory) + + def browse_file(self, var, filetypes): + file_path = filedialog.askopenfilename(filetypes=filetypes) + if file_path: + var.set(file_path) + + def browse_excel(self): + file_path = filedialog.askopenfilename(filetypes=[("Excel文件", "*.xlsx"), ("所有文件", "*.*")]) + if file_path: + self.excel_path_var.set(file_path) + + def test_db_connection(self): + try: + host = self.db_host_var.get() + user = self.db_user_var.get() + password = self.db_password_var.get() + database = self.db_name_var.get() + + connection = pymysql.connect( + host=host, + user=user, + password=password, + database=database + ) + connection.close() + messagebox.showinfo("连接成功", "数据库连接测试成功!") + except Exception as e: + messagebox.showerror("连接失败", f"数据库连接测试失败:{e}") + + def preview_image_effect(self): + try: + # 创建一个示例图片 + img = Image.new('RGB', (400, 300), color=(240, 240, 240)) + draw = ImageDraw.Draw(img) + draw.rectangle([50, 50, 350, 250], fill=(200, 200, 200)) + draw.text((150, 140), "示例图片", fill=(0, 0, 0)) + + # 应用图片修改效果 + modified_img = self.apply_image_modifications(img) + + # 显示修改后的图片 + self.show_preview_image(modified_img) + except Exception as e: + messagebox.showerror("预览失败", f"生成预览图片时出错:{e}") + + def apply_image_modifications(self, img): + """应用当前配置的图片修改效果""" + width, height = img.size + + try: + # 从界面获取参数 + crop_percent = float(self.crop_percent_var.get()) + min_rotation = float(self.min_rotation_var.get()) + max_rotation = float(self.max_rotation_var.get()) + min_brightness = float(self.min_brightness_var.get()) + max_brightness = float(self.max_brightness_var.get()) + min_contrast = float(self.min_contrast_var.get()) + max_contrast = float(self.max_contrast_var.get()) + min_saturation = float(self.min_saturation_var.get()) + max_saturation = float(self.max_saturation_var.get()) + + # 裁剪 + crop_size = (int(width * crop_percent), int(height * crop_percent)) + left = (width - crop_size[0]) // 2 + top = (height - crop_size[1]) // 2 + right = left + crop_size[0] + bottom = top + crop_size[1] + img = img.crop((left, top, right, bottom)) + + # 旋转 + rotation_angle = random.uniform(min_rotation, max_rotation) + img = img.rotate(rotation_angle) + + # 调整亮度 + enhancer = ImageEnhance.Brightness(img) + brightness_factor = random.uniform(min_brightness, max_brightness) + img = enhancer.enhance(brightness_factor) + + # 调整对比度 + enhancer = ImageEnhance.Contrast(img) + contrast_factor = random.uniform(min_contrast, max_contrast) + img = enhancer.enhance(contrast_factor) + + # 调整饱和度 + enhancer = ImageEnhance.Color(img) + saturation_factor = random.uniform(min_saturation, max_saturation) + img = enhancer.enhance(saturation_factor) + + return img + except Exception as e: + messagebox.showerror("应用效果失败", f"应用图片修改效果时出错:{e}") + return img + + + + def start_processing(self): + """开始处理链接""" + if self.running: + messagebox.showinfo("处理中", "已有任务正在处理中,请等待完成") + return + + try: + # 更新Excel文件路径 + excel_path = self.excel_path_var.get() + if not os.path.exists(excel_path): + messagebox.showerror("文件错误", f"Excel文件不存在:{excel_path}") + return + + # 获取线程数 + try: + num_threads = int(self.thread_count_var.get()) + if num_threads < 1: + num_threads = 1 + elif num_threads > MAX_THREADS: + num_threads = MAX_THREADS + except: + num_threads = 1 + + # 禁用开始按钮 + self.start_button.config(state=tk.DISABLED) + self.running = True + + # 清空日志 + self.log_text.config(state=tk.NORMAL) + self.log_text.delete(1.0, tk.END) + self.log_text.config(state=tk.DISABLED) + + # 获取AI服务提供商选择 + ai_service = self.ai_service_var.get() + + # 获取生成类型 + generation_type = self.generation_type_var.get() + + # 获取当前选中的模板配置 + current_template = self.get_current_template() + + # 在新线程中运行处理任务 + self.thread = threading.Thread(target=self.run_processing, + args=(excel_path, num_threads, ai_service, generation_type, + current_template)) + self.thread.daemon = True + self.thread.start() + + # 启动进度更新 + self.after(100, self.update_progress) + except Exception as e: + messagebox.showerror("启动失败", f"启动处理任务时出错:{e}") + self.start_button.config(state=tk.NORMAL) + self.running = False + + def run_processing(self, excel_path, num_threads, ai_service, generation_type=None, current_template=None): + """运行处理任务""" + try: + # 读取Excel文件 + df = pd.read_excel(excel_path) + + # 获取关键词列表 + keywords = self.keywords_var.get().split(",") + + # 获取图片处理参数 + crop_percent = float(self.crop_percent_var.get()) + min_rotation = float(self.min_rotation_var.get()) + max_rotation = float(self.max_rotation_var.get()) + min_brightness = float(self.min_brightness_var.get()) + max_brightness = float(self.max_brightness_var.get()) + min_saturation = float(self.min_saturation_var.get()) + max_saturation = float(self.max_saturation_var.get()) + watermark_text = self.watermark_text_var.get() + watermark_opacity = int(self.watermark_opacity_var.get()) + overlay_opacity = int(self.overlay_opacity_var.get()) + + # 1. 裁剪边缘 + crop_px_w = int(width * crop_percent) + crop_px_h = int(height * crop_percent) + img = img.crop((crop_px_w, crop_px_h, width - crop_px_w, height - crop_px_h)) + + # 2. 随机旋转 + angle = random.uniform(min_rotation, max_rotation) * random.choice([-1, 1]) + img = img.rotate(angle, expand=True) + + # 3. 调整亮度 + enhancer = ImageEnhance.Brightness(img) + factor = random.uniform(min_brightness, max_brightness) + img = enhancer.enhance(factor) + + # 4. 添加文字水印 + draw = ImageDraw.Draw(img) + font_size = max(20, int(min(img.size) * 0.05)) + try: + num_threads = int(self.thread_count_var.get()) + if num_threads < 1: + num_threads = 1 + elif num_threads > MAX_THREADS: + num_threads = MAX_THREADS + except: + num_threads = 1 + + # 禁用开始按钮 + self.start_button.config(state=tk.DISABLED) + self.running = True + + # 清空日志 + self.log_text.config(state=tk.NORMAL) + self.log_text.delete(1.0, tk.END) + self.log_text.config(state=tk.DISABLED) + + # 获取AI服务提供商选择 + ai_service = self.ai_service_var.get() + + # 获取生成类型 + generation_type = self.generation_type_var.get() + + # 获取当前选中的模板配置 + current_template = self.get_current_template() + + # 在新线程中运行处理任务 + self.thread = threading.Thread(target=self.run_processing, + args=(excel_path, num_threads, ai_service, generation_type, + current_template)) + self.thread.daemon = True + self.thread.start() + + # 启动进度更新 + self.after(100, self.update_progress) + except Exception as e: + messagebox.showerror("启动失败", f"启动处理任务时出错:{e}") + self.start_button.config(state=tk.NORMAL) + self.running = False + + def run_processing(self, excel_path, num_threads, ai_service, generation_type=None, current_template=None): + """在后台线程中运行处理任务""" + try: + # 更新全局变量 + global TITLE_BASE_PATH + TITLE_BASE_PATH = excel_path + + # 记录开始时间 + start_time = time.time() + + # 如果有模板配置,临时更新CONFIG + original_config = None + if current_template and ai_service == 'coze': + original_config = { + 'workflow_id': CONFIG['Coze']['workflow_id'], + 'access_token': CONFIG['Coze']['access_token'], + 'is_async': CONFIG['Coze']['is_async'], + # 'input_data_template': CONFIG['Coze'].get('input_data_template', '') + } + + CONFIG['Coze']['workflow_id'] = current_template.get('workflow_id', '') + CONFIG['Coze']['access_token'] = current_template.get('access_token', '') + CONFIG['Coze']['is_async'] = current_template.get('is_async', 'true') + # CONFIG['Coze']['input_data_template'] = current_template.get('input_data_template', '') + + logger.info(f"应用模板配置: {current_template.get('name')}") + logger.info(f"Workflow ID: {CONFIG['Coze']['workflow_id']}") + logger.info(f"Access Token: {'*' * len(CONFIG['Coze']['access_token'])}") + logger.info(f"Is Async: {CONFIG['Coze']['is_async']}") + # logger.info(f"Input Template: {CONFIG['Coze']['input_data_template']}") + + # 读取链接并处理 + logger.info(f"开始处理链接,使用 {num_threads} 个线程,生成类型: {generation_type}") + if current_template: + logger.info(f"使用模板: {current_template.get('name', '未命名')}") + results = link_to_text(num_threads=num_threads, ai_service=ai_service, current_template=current_template, + generation_type=generation_type) + + # 计算处理结果 + total_links = len(results) + success_links = sum(1 for _, success, _ in results if success) + + # 记录结束时间和总耗时 + end_time = time.time() + elapsed_time = end_time - start_time + + # 记录处理结果 + logger.info( + f"处理完成,共处理 {total_links} 个链接,成功 {success_links} 个,失败 {total_links - success_links} 个") + logger.info(f"总耗时: {elapsed_time:.2f} 秒") + + # 在主线程中显示处理结果 + self.after(0, lambda: messagebox.showinfo("处理完成", + f"共处理 {total_links} 个链接\n成功: {success_links} 个\n失败: {total_links - success_links} 个\n总耗时: {elapsed_time:.2f} 秒")) + except Exception as e: + logger.error(f"处理任务出错: {e}") + error_msg = str(e) + self.after(0, lambda: messagebox.showerror("处理错误", f"处理任务出错: {error_msg}")) + # self.after(0, lambda e=e: messagebox.showerror("处理错误", f"处理任务出错: {e}")) + finally: + # 恢复原始配置(如果有的话) + if original_config is not None: + CONFIG['Coze']['workflow_id'] = original_config['workflow_id'] + CONFIG['Coze']['access_token'] = original_config['access_token'] + CONFIG['Coze']['is_async'] = original_config['is_async'] + # CONFIG['Coze']['input_data_template'] = original_config['input_data_template'] + + # 恢复开始按钮状态 + self.after(0, lambda: self.start_button.config(state=tk.NORMAL)) + self.running = False + + def update_progress(self): + """更新进度条和状态""" + if not self.running: + return + + try: + # 获取当前进度 + total = task_queue.qsize() + result_queue.qsize() + done = result_queue.qsize() + + if total > 0: + # 更新进度条 + progress = (done / total) * 100 + self.progress_var.set(progress) + + # 更新标题显示进度 + self.title(f"文章采集与处理工具 - 进度: {progress:.1f}%") + + # 继续更新 + self.after(500, self.update_progress) + except Exception as e: + logger.error(f"更新进度出错: {e}") + + def on_close(self): + """关闭窗口时的处理""" + if self.running: + if messagebox.askyesno("确认退出", "任务正在处理中,确定要退出吗?"): + self.destroy() + else: + self.destroy() + + +# 日志处理器类,用于将日志输出到文本框 +class LogTextHandler(logging.Handler): + def __init__(self, text_widget): + logging.Handler.__init__(self) + self.text_widget = text_widget + + def emit(self, record): + msg = self.format(record) + + def append(): + self.text_widget.configure(state=tk.NORMAL) + self.text_widget.insert(tk.END, msg + '\n') + self.text_widget.see(tk.END) # 自动滚动到底部 + self.text_widget.configure(state=tk.DISABLED) + + # 在主线程中更新UI + self.text_widget.after(0, append) + + +# 主函数 +def main(): + # 初始化日志 + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler("article_replace.log", encoding='utf-8'), + logging.StreamHandler() + ] + ) + + # 创建必要的目录 + if not os.path.exists(ARTICLES_BASE_PATH): + os.makedirs(ARTICLES_BASE_PATH) + if not os.path.exists(IMGS_BASE_PATH): + os.makedirs(IMGS_BASE_PATH) + + # 启动GUI应用 + app = ArticleReplaceApp() + app.mainloop() + + +if __name__ == "__main__": + main() diff --git a/ai_studio.py b/ai_studio.py new file mode 100644 index 0000000..1a12cf4 --- /dev/null +++ b/ai_studio.py @@ -0,0 +1,173 @@ +import json + +import requests + +from config import * + + +# ==========================调用dify工作流=============================================== +def call_dify_workflow(input_data): + """ + 调用Dify工作流的函数。 + + :param input_data: 传递给工作流的输入数据 + :return: 工作流的输出结果 + """ + logger.info("Dify开始工作。。。") + api_key = CONFIG['Dify']['api_key'] + user_id = CONFIG['Dify']['user_id'] + url = CONFIG['Dify']['url'] + + headers = { + 'Authorization': f'Bearer {api_key}', + 'Content-Type': 'application/json', + } + data = { + "inputs": input_data, + "response_mode": "blocking", + "user": user_id + } + response = requests.post(url, headers=headers, data=json.dumps(data)) + json_data = json.loads(response.text) + print("json_data:", json_data) + + # 获取article的值 + article = json_data['data']['outputs']['article'] + # print("article:", article) + return article + + +# ==========================调用coze工作流========================== + + +def call_coze_workflow(parameters): + """ + 调用 Coze 工作流的函数 + + :param parameters: 传递给工作流的输入参数(字典格式) + :return: 工作流的执行结果 + """ + logger.info("Coze开始工作。。。。") + workflow_id = CONFIG['Coze']['workflow_id'] + access_token = CONFIG['Coze']['access_token'] + is_async = CONFIG['Coze']['is_async'].lower() == 'true' + + url = "https://api.coze.cn/v1/workflow/run" + + headers = { + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json" + } + + data = { + "workflow_id": workflow_id, + "parameters": parameters, + "is_async": is_async + } + + response = requests.post(url, json=data, headers=headers) + + if response.status_code == 200: + # data = json.loads(response.text)['data'] + # print("data:",data['output']) + + return response.text + else: + return { + "error": f"请求失败,状态码:{response.status_code}", + "detail": response.text + } + + +def call_coze_article_workflow(parameters): + """ + 调用 Coze 工作流的函数 + + :param parameters: 传递给工作流的输入参数(字典格式) + :param is_async: 是否异步执行(默认 False) + :return: 工作流的执行结果 + """ + + workflow_id = CONFIG['Coze']['workflow_id'] + access_token = CONFIG['Coze']['access_token'] + is_async = CONFIG['Coze']['is_async'].lower() == 'true' + url = "https://api.coze.cn/v1/workflow/run" + headers = { + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json" + } + data = { + "workflow_id": workflow_id, + "parameters": parameters, + "is_async": is_async + } + + response = requests.post(url, json=data, headers=headers) + + if response.status_code == 200: + # data = json.loads(response.text)['data'] + # print("data:",data['output']) + import ast + + # 直接解析整个result字符串 + result_dict = ast.literal_eval(response.text) + + # 解析data字段 + data_dict = ast.literal_eval(result_dict['data']) + + # 获取output的值 + output_value = data_dict['output'] + + return output_value + else: + return { + "error": f"请求失败,状态码:{response.status_code}", + "detail": response.text + } + + +def call_coze_all_article_workflow(parameters,is_async=False): + """ + 调用 Coze 工作流的函数 + + :param parameters: 传递给工作流的输入参数(字典格式) + :param is_async: 是否异步执行(默认 False) + :return: 工作流的执行结果 + """ + workflow_id = CONFIG['Coze']['workflow_id'] + access_token = CONFIG['Coze']['access_token'] + is_async = CONFIG['Coze']['is_async'].lower() == 'False' + url = "https://api.coze.cn/v1/workflow/run" + headers = { + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json" + } + data = { + "workflow_id": workflow_id, + "parameters": parameters, + "is_async": is_async + } + + response = requests.post(url, json=data, headers=headers) + + if response.status_code == 200: + # data = json.loads(response.text)['data'] + # print("data:",data['output']) + import ast + + # 直接解析整个result字符串 + result_dict = ast.literal_eval(response.text) + print(result_dict) + + # 解析data字段 + data_dict = ast.literal_eval(result_dict['data']) + + # 获取output的值 + title = data_dict['title'] + article = data_dict['article'] + return title, article + else: + return { + "error": f"请求失败,状态码:{response.status_code}", + "detail": response.text + } diff --git a/config.py b/config.py new file mode 100644 index 0000000..d635b72 --- /dev/null +++ b/config.py @@ -0,0 +1,121 @@ +import configparser +import getpass +import logging +import os + +# 配置文件路径 +CONFIG_FILE = "config.ini" + +# 默认配置 +DEFAULT_CONFIG = { + "General": { + "chrome_user_dir": f"C:\\Users\\{getpass.getuser()}\\AppData\\Local\\Google\\Chrome\\User Data", + "articles_path": "articles", + "images_path": "picture", + "title_file": "文章链接.xlsx", + "max_threads": "3" + }, + "Coze": { + "workflow_id": "", + "access_token": "", + "is_async": "false", + "input_data_template": "{\"article\": \"{article_text}\", \"link\":\"{link}\", \"weijin\":\"{weijin}\"}", + "last_used_template": "", + "last_used_template_type": "文章" + }, + "Database": { + "host": "27.106.125.150", + "user": "root", + "password": "taiyi.1224", + "database": "toutiao" + }, + "Dify": { + "api_key": "app-87gssUKFBs9BwJw4m95uUcyF", + "user_id": "toutiao", + "url": "http://27.106.125.150/v1/workflows/run" + }, + "Baidu": { + "api_key": "", + "secret_key": "" + }, + "ImageModify": { + "crop_percent": "0.02", + "min_rotation": "0.3", + "max_rotation": "3.0", + "min_brightness": "0.8", + "max_brightness": "1.2", + "watermark_text": "Qin Quan Shan Chu", + "watermark_opacity": "128", + "overlay_opacity": "30" + }, + "Keywords": { + "banned_words": "珠海,落马,股票,股市,股民,爆炸,火灾,死亡,抢劫,诈骗,习大大,习近平,政府,官员,扫黑,警察,落网,嫌疑人,通报,暴力执法,执法,暴力,气象,天气,暴雨,大雨" + } +} + + +# 加载配置 +def load_config(): + config = configparser.ConfigParser() + + # 如果配置文件不存在,创建默认配置 + if not os.path.exists(CONFIG_FILE): + for section, options in DEFAULT_CONFIG.items(): + config[section] = options + + with open(CONFIG_FILE, 'w', encoding='utf-8') as f: + config.write(f) + else: + config.read(CONFIG_FILE, encoding='utf-8') + + # 检查并添加缺失的配置项 + for section, options in DEFAULT_CONFIG.items(): + if not config.has_section(section): + config[section] = {} + + for option, value in options.items(): + if not config.has_option(section, option): + config[section][option] = value + + # 保存更新后的配置 + with open(CONFIG_FILE, 'w', encoding='utf-8') as f: + config.write(f) + + return config + + +# 保存配置 +def save_config(config): + with open(CONFIG_FILE, 'w', encoding='utf-8') as f: + config.write(f) + + +# 加载配置 +CONFIG = load_config() + +# 更新全局变量 +USER_DIR_PATH = CONFIG['General']['chrome_user_dir'] +ARTICLES_BASE_PATH = CONFIG['General']['articles_path'] +IMGS_BASE_PATH = CONFIG['General']['images_path'] +TITLE_BASE_PATH = CONFIG['General']['title_file'] +MAX_THREADS = int(CONFIG['General']['max_threads']) + +# 创建必要的目录 +if not os.path.exists(ARTICLES_BASE_PATH): + os.makedirs(ARTICLES_BASE_PATH) + os.chmod(ARTICLES_BASE_PATH, 0o777) +if not os.path.exists(IMGS_BASE_PATH): + os.makedirs(IMGS_BASE_PATH) + os.chmod(IMGS_BASE_PATH, 0o777) + +# 日志配置 +logging.basicConfig(level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler("article_replace.log", encoding='utf-8'), + logging.StreamHandler() + ]) +logger = logging.getLogger(__name__) + +# 日志文件保存路径 +LOG_FILE = "article_replace.log" \ No newline at end of file diff --git a/databases.py b/databases.py new file mode 100644 index 0000000..06dda33 --- /dev/null +++ b/databases.py @@ -0,0 +1,86 @@ +import pymysql + + +# ==============================数据库模块=================================== +def check_link_exists(host, user, password, database, link): + """ + 检查指定的 link 是否存在于 MySQL 数据库表中,如果不存在,则插入该链接 + :param host: MySQL 数据库主机地址 + :param user: MySQL 用户名 + :param password: MySQL 密码 + :param database: 数据库名称 + :param link: 需要检查的链接 + :return: 如果链接存在,返回 True;如果链接不存在且插入成功,返回 False + """ + connection = None # 确保 connection 被初始化 + + try: + # 连接到 MySQL 数据库 + connection = pymysql.connect( + host=host, + user=user, + password=password, + database=database + ) + + with connection.cursor() as cursor: + # 查询链接是否存在 + cursor.execute("SELECT 1 FROM links WHERE link = %s", (link,)) + result = cursor.fetchone() + + # 如果链接存在 + if result: + return True + else: + return False + + except pymysql.MySQLError as e: + print(f"数据库错误: {e}") + return False + finally: + # 确保在结束时关闭连接 + if connection: + connection.close() + + +def check_link_insert(host, user, password, database, link): + """ + 检查指定的 link 是否存在于 MySQL 数据库表中,如果不存在,则插入该链接 + :param host: MySQL 数据库主机地址 + :param user: MySQL 用户名 + :param password: MySQL 密码 + :param database: 数据库名称 + :param link: 需要检查的链接 + :return: 如果链接存在,返回 True;如果链接不存在且插入成功,返回 False + """ + connection = None # 确保 connection 被初始化 + try: + # 连接到 MySQL 数据库 + connection = pymysql.connect( + host=host, + user=user, + password=password, + database=database + ) + + with connection.cursor() as cursor: + # 查询链接是否存在 + cursor.execute("SELECT 1 FROM links WHERE link = %s", (link,)) + result = cursor.fetchone() + if result: + # 如果链接已经存在,返回 True + return True + else: + # 插入链接 + cursor.execute("INSERT INTO links (link) VALUES (%s)", (link,)) + connection.commit() # 提交事务 + print("链接已插入") + return False + except pymysql.MySQLError as e: + print(f"数据库错误: {e}") + return False + finally: + # 确保在结束时关闭连接 + if connection: + connection.close() + diff --git a/get_web_content.py b/get_web_content.py new file mode 100644 index 0000000..1f513b2 --- /dev/null +++ b/get_web_content.py @@ -0,0 +1,419 @@ +from bs4 import BeautifulSoup +import time +import random +from selenium import webdriver +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.common.by import By +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +import requests + + +def extract_images_from_html(html_content): + soup = BeautifulSoup(html_content, 'html.parser') + + # 匹配所有以 https://p3-sign.toutiaoi 开头的图片链接 + img_tags = soup.find_all('img') + img_urls = [] + + for img in img_tags: + for attr in ['src', 'data-src']: + url = img.get(attr) + if url and url.startswith("https://p3-sign.toutiaoimg.com/tos-cn-i"): + img_urls.append(url) + + # 去重处理 + img_urls = list(dict.fromkeys(img_urls)) + + # 返回 JSON 格式 + return {"image": img_urls} + + + +# ============================================================ +def get_webpage_source(url): + """ + 获取网页源代码的通用函数 + """ + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', + 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', + 'Connection': 'keep-alive', + 'Upgrade-Insecure-Requests': '1' + } + + try: + # 添加随机延迟,模拟人类行为 + time.sleep(random.uniform(1, 3)) + response = requests.get(url, headers=headers, timeout=10) + response.encoding = 'utf-8' + + # 检查响应状态 + if response.status_code == 200: + return response.text + else: + print(f"请求失败,状态码: {response.status_code}") + return None + except Exception as e: + print(f"获取网页源代码时出错: {e}") + return None + + +# def get_webpage_source_selenium(url): +# """ +# 使用Selenium获取网页源代码,适用于动态加载内容的网站 +# """ +# # 配置Chrome选项 +# chrome_options = Options() +# chrome_options.add_argument('--headless') # 无头模式 +# chrome_options.add_argument('--disable-gpu') +# chrome_options.add_argument('--no-sandbox') +# chrome_options.add_argument('--disable-dev-shm-usage') +# chrome_options.add_argument('--disable-blink-features=AutomationControlled') +# chrome_options.add_argument( +# 'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36') +# +# # 初始化WebDriver +# driver = webdriver.Chrome(options=chrome_options) +# +# try: +# # 访问URL +# driver.get(url) +# +# # 等待页面加载完成(可根据实际情况调整等待条件) +# time.sleep(3) # 简单等待3秒 +# +# # 尝试等待文章内容加载 +# try: +# WebDriverWait(driver, 10).until( +# EC.presence_of_element_located((By.TAG_NAME, "article")) +# ) +# except: +# print("等待文章元素超时,将使用当前页面内容") +# +# # 获取页面源代码 +# page_source = driver.page_source +# +# # 保存源代码到文件 +# with open("toutiao_source_selenium.html", "w", encoding="utf-8") as f: +# f.write(page_source) +# +# return page_source +# except Exception as e: +# print(f"使用Selenium获取网页源代码时出错: {e}") +# return None +# finally: +# # 关闭浏览器 +# driver.quit() + + +# =====================采集内容内容================================== +# def toutiao_w_extract_content(url): +# """ +# 使用requests和BeautifulSoup提取头条页面内容 +# """ +# html_content = get_webpage_source_selenium(url) +# +# # 使用BeautifulSoup解析HTML +# soup = BeautifulSoup(html_content, 'html.parser') +# +# # 提取标题和文章内容 +# article_element = soup.select_one('article') +# +# if not article_element: +# # 尝试其他可能的选择器 +# article_element = soup.select_one('.article-content') or soup.select_one('.content') +# +# title_element = soup.select_one('h1') or soup.select_one('.article-title') +# title_text = title_element.get_text().strip() if title_element else "" +# article_text = article_element.get_text().strip() if article_element else "" +# +# # 提取图片URL +# img_elements = article_element.select('img') if article_element else [] +# img_urls = [img.get('src') for img in img_elements if img.get('src')] +# +# return title_text, article_text, img_urls + + +def toutiao_extract_content(url): + + """ + 使用requests和BeautifulSoup提取头条页面内容 + """ + html_content = get_webpage_source_selenium(url) + + # 使用BeautifulSoup解析HTML + soup = BeautifulSoup(html_content, 'html.parser') + + + + # 提取标题和文章内容 + title_selector = '#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > h1' + article_selector = '#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > div > article' + + title_element = soup.select_one(title_selector) + article_element = soup.select_one(article_selector) + + title_text = title_element.get_text().strip() if title_element else "" + article_text = article_element.get_text().strip() if article_element else "" + + # 提取图片URL + # img_selector = "#root > div.article-detail-container > div.main > div.show-monitor article img" + # img_elements = soup.select(img_selector) + # # img_elements = article_element.select('img') if article_element else [] + + img_urls = extract_images_from_html(html_content)['image'] + # img_urls = [img.get('src') for img in img_elements if img.get('src').startswith("https://p3")] + + + + + return title_text, article_text, img_urls + + + + + + + +def wechat_extract_content(url): + """ + 使用requests和BeautifulSoup提取微信公众号页面内容 + """ + html_content = get_webpage_source_selenium(url) + + # 使用BeautifulSoup解析HTML + soup = BeautifulSoup(html_content, 'html.parser') + + # 使用指定的选择器提取标题和文章内容 + title_element = soup.select_one('#activity-name') + article_element = soup.select_one('#js_content') + + title_text = title_element.get_text().strip() if title_element else "" + article_text = article_element.get_text().strip() if article_element else "" + + # 提取特定 section 中的图片 URL(仅保留以 https://mmbiz.qpic.cn 开头的) + img_elements = article_element.select('img') if article_element else [] + img_urls = [] + for img in img_elements: + src = img.get('src') or img.get('data-src') + if src and src.startswith('https://mmbiz.qpic.cn'): + img_urls.append(src) + + return title_text, article_text, img_urls + + + +def wangyi_extract_content(url): + + """ + 使用requests和BeautifulSoup提取头条页面内容 + """ + html_content = get_webpage_source_selenium(url) + + # 使用BeautifulSoup解析HTML + soup = BeautifulSoup(html_content, 'html.parser') + + + + # 提取标题和文章内容 + title_selector = '#contain > div.post_main > h1' + article_selector = '#content > div.post_body' + # img_selector = "#content > div.post_body > p > img" + + title_element = soup.select_one(title_selector) + article_element = soup.select_one(article_selector) + + title_text = title_element.get_text().strip() if title_element else "" + article_text = article_element.get_text().strip() if article_element else "" + + # 提取图片URL + img_selector = "#content > div.post_body > p > img" + img_elements = soup.select(img_selector) + img_elements = article_element.select('img') if article_element else [] + + # img_urls = extract_images_from_html(html_content)['image'] + img_urls = [img.get('src') for img in img_elements if img.get('src')] + + return title_text, article_text, img_urls + + + + +def souhu_extract_content(url): + + """ + 使用requests和BeautifulSoup提取头条页面内容 + """ + html_content = get_webpage_source_selenium(url) + + # 使用BeautifulSoup解析HTML + soup = BeautifulSoup(html_content, 'html.parser') + + + print(soup) + # 提取标题和文章内容 + title_selector = '#article-container > div.left.main > div:nth-child(1) > div > div.text-title > h1' + article_selector = '#mp-editor' + # img_selector = "#content > div.post_body > p > img" + + title_element = soup.select_one(title_selector) + article_element = soup.select_one(article_selector) + + title_text = title_element.get_text().strip() if title_element else "" + article_text = article_element.get_text().strip() if article_element else "" + + # 提取图片URL + # img_selector = "#mp-editor > p > img" + # img_elements = soup.select(img_selector) + img_elements = article_element.select('img') if article_element else [] + + + img_urls = [img.get('src') for img in img_elements if img.get('src')] + + return title_text, article_text, img_urls + + +def toutiao_w_extract_content(url): + """ + 优化后的头条页面内容提取函数 + 专门获取文章内容中的图片链接 + """ + html_content = get_webpage_source_selenium(url) + + if not html_content: + print("获取HTML内容失败") + return "", "", [] + + # 使用BeautifulSoup解析HTML + soup = BeautifulSoup(html_content, 'html.parser') + + # 多种标题选择器,按优先级尝试 + title_selectors = [ + '#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > h1', + 'h1.article-title', + 'h1[data-testid="headline"]', + '.article-title h1', + '.article-header h1', + 'article h1', + 'h1' + ] + + title_text = "" + for selector in title_selectors: + title_element = soup.select_one(selector) + if title_element: + title_text = title_element.get_text().strip() + break + + # 多种文章内容选择器,按优先级尝试 + article_selectors = [ + '#root > div.article-detail-container > div.main > div.show-monitor > div > div > div > div > div > article', + 'article', + '.article-content', + '.content', + '#js_content', + '.post_body', + '[data-testid="article-content"]' + ] + + article_text = "" + article_element = None + for selector in article_selectors: + article_element = soup.select_one(selector) + if article_element: + article_text = article_element.get_text().strip() + break + + # 只从文章内容中提取图片 + img_urls = [] + + if article_element: + # 查找文章内容中的所有图片元素 + img_elements = article_element.find_all('img') + + for img in img_elements: + # 尝试多种可能的图片URL属性 + for attr in ['src', 'data-src', 'data-original', 'data-lazy-src']: + url = img.get(attr) + if url: + # 处理相对路径 + if url.startswith('//'): + url = 'https:' + url + elif url.startswith('/'): + url = 'https://www.toutiao.com' + url + + # 只收集头条相关的图片URL + if any(domain in url for domain in ['toutiaoimg.com', 'p3-sign.toutiaoimg.com', 'byteimg.com']): + img_urls.append(url) + break # 找到一个有效URL就跳出内层循环 + + # 如果上面没有找到图片,尝试使用现有的extract_images_from_html函数作为备选 + if not img_urls: + extracted_imgs = extract_images_from_html(html_content) + if extracted_imgs and 'image' in extracted_imgs: + img_urls = extracted_imgs['image'] + + # 去重处理 + img_urls = list(dict.fromkeys(img_urls)) + + return title_text, article_text, img_urls + + +def get_webpage_source_selenium(url): + """ + 增强版的Selenium获取网页源代码函数 + 专门针对头条网站的动态加载特性进行优化 + """ + chrome_options = Options() + chrome_options.add_argument('--headless') + chrome_options.add_argument('--disable-gpu') + chrome_options.add_argument('--no-sandbox') + chrome_options.add_argument('--disable-dev-shm-usage') + chrome_options.add_argument('--disable-blink-features=AutomationControlled') + chrome_options.add_argument('--disable-images') # 禁用图片加载以提高速度 + chrome_options.add_argument('--disable-javascript') # 如果不需要JS,可以禁用 + chrome_options.add_argument( + 'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36') + + driver = webdriver.Chrome(options=chrome_options) + + try: + driver.get(url) + + # 等待页面加载完成 + time.sleep(5) + + # 尝试等待关键元素加载 + wait = WebDriverWait(driver, 15) + try: + # 等待文章标题加载 + wait.until(EC.presence_of_element_located((By.TAG_NAME, "h1"))) + # 等待文章内容加载 + wait.until(EC.presence_of_element_located((By.TAG_NAME, "article"))) + except: + print("等待关键元素超时,使用当前页面内容") + + # 滚动页面以触发懒加载 + driver.execute_script("window.scrollTo(0, document.body.scrollHeight/2);") + time.sleep(2) + driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") + time.sleep(2) + driver.execute_script("window.scrollTo(0, 0);") + time.sleep(1) + + page_source = driver.page_source + + # # 保存源代码用于调试 + # with open("toutiao_source_enhanced.html", "w", encoding="utf-8") as f: + # f.write(page_source) + + return page_source + + except Exception as e: + print(f"使用增强版Selenium获取网页源代码时出错: {e}") + return None + finally: + driver.quit() + diff --git a/images_edit.py b/images_edit.py new file mode 100644 index 0000000..4d76770 --- /dev/null +++ b/images_edit.py @@ -0,0 +1,341 @@ +import logging +import os +import random + +import requests +from PIL import Image +from PIL import ImageDraw, ImageFont, ImageEnhance + +from config import * +from utils import safe_open_directory, safe_filename + +IMGS_BASE_PATH = CONFIG['General']['images_path'] + + +def crop_and_replace_images(folder_path): + """ + 修改图片尺寸 + :param folder_path: + :return: + """ + print("开始处理图片。。。。") + # 遍历文件夹中的所有文件 + for filename in os.listdir(folder_path): + # 检查文件扩展名是否为图片格式 + if filename.lower().endswith(('.jpg')): + # 拼接完整的文件路径 + file_path = os.path.join(folder_path, filename) + print("文件夹路径:" + folder_path) + print("文件路径:" + file_path) + # 打开图片 + with Image.open(file_path) as img: + # 获取图片的尺寸 + width, height = img.size + # 裁剪图片,裁剪下方10px + print("裁剪图片。。。") + cropped_img = img.crop((0, 0, width, height - (height * 0.1))) + # 保存裁剪后的图片,覆盖原文件 + # 通过拉伸使改变裁剪后图片的尺寸与原图片尺寸相同 + resized_img = cropped_img.resize((width, height)) + # output_path = file_path[0:file_path.find('.')] + '.png' + + resized_img.save(file_path, 'jpg') + + +def deduplicate_images(folder_path): + print("开始对图片去重。。。") + """扫描 folder_path 下的图片,对每张图片做修改并直接覆盖原文件""" + if not os.path.exists(folder_path): + print("错误:输入文件夹不存在!") + return + + supported_ext = ('.png', '.jpg', '.jpeg', '.bmp', '.gif', '.webp') + + for root, _, files in os.walk(folder_path): + for file in files: + if file.lower().endswith(supported_ext): + file_path = os.path.join(root, file) + try: + with Image.open(file_path) as img: + modified_img = modify_image(img) + modified_img.save(file_path) # 直接覆盖原图片 + print(f"已处理并覆盖:{file_path}") + except Exception as e: + print(f"处理 {file_path} 时出错:{e}") + + +def download_image(image_url, save_path): + """ + 下载图片并保存 + :param image_url: 图片链接 + :param save_path: 保存路径 + :return: + """ + try: + response = requests.get(image_url) + if response.status_code == 200: + with open(save_path, 'wb') as f: + f.write(response.content) + print(f"图片下载成功,保存路径为:{save_path}") + else: + print(f"图片下载失败,状态码为:{response.status_code}") + except requests.exceptions.RequestException as e: + print(f"请求出错:{e}") + + +def download_and_process_images(img_urls, article_title, save_dir=None): + """ + 下载并处理图片 + :param img_urls: 图片URL列表 + :param article_title: 文章标题 + :param save_dir: 自定义保存目录,如果为None则使用默认目录 + """ + if save_dir is None: + save_dir = IMGS_BASE_PATH + + # 使用safe_filename处理文章标题 + safe_title = safe_filename(article_title) + # 使用os.path.normpath来规范化路径,避免路径分隔符的问题 + img_dir_path = os.path.normpath(os.path.join(str(save_dir), safe_title)) + logger.info(f"图片保存路径:{img_dir_path}") + safe_open_directory(img_dir_path) + + for i, img_url in enumerate(img_urls): + if img_url.startswith("https"): + imgurl = img_url + else: + imgurl = "https:" + img_url + # 使用os.path.normpath来规范化图片路径 + img_path = os.path.normpath(os.path.join(img_dir_path, f"图片{i}.jpg")) + try: + download_image(imgurl, img_path) + # 只处理当前下载的图片,而不是整个文件夹 + with Image.open(img_path) as img: + modified_img = modify_image(img) + modified_img.save(img_path) # 直接覆盖原图片 + print(f"已处理并覆盖:{img_path}") + except Exception as e: + logging.error(f"处理图片失败: {e}") + +# def download_and_process_images(img_urls, article_title, save_dir=None): +# """ +# 下载并处理图片 +# :param img_urls: 图片URL列表 +# :param article_title: 文章标题 +# :param save_dir: 自定义保存目录,如果为None则使用默认目录 +# """ +# if save_dir is None: +# save_dir = IMGS_BASE_PATH +# +# img_dir_path = os.path.join(str(save_dir), str(article_title)) +# logger.info(f"图片保存路径:{img_dir_path}") +# safe_open_directory(img_dir_path) +# +# for i, img_url in enumerate(img_urls): +# if img_url.startswith("https"): +# imgurl = img_url +# else: +# imgurl = "https:"+img_url +# img_path = os.path.join(img_dir_path, f"图片{i}.jpg") +# try: +# download_image(imgurl, img_path) +# # crop_and_replace_images(img_dir_path) +# deduplicate_images(img_dir_path) +# except Exception as e: +# logging.error(f"处理图片失败: {e}") + + +# def modify_image(img): +# print("修改图片") +# """对图片应用去重处理,不翻转,仅裁剪、旋转、亮度调整、添加水印、加透明蒙版""" +# width, height = img.size +# +# # 从配置中获取参数 +# crop_percent = float(CONFIG['ImageModify']['crop_percent']) +# min_rotation = float(CONFIG['ImageModify']['min_rotation']) +# max_rotation = float(CONFIG['ImageModify']['max_rotation']) +# min_brightness = float(CONFIG['ImageModify']['min_brightness']) +# max_brightness = float(CONFIG['ImageModify']['max_brightness']) +# watermark_text = CONFIG['ImageModify']['watermark_text'] +# watermark_opacity = int(CONFIG['ImageModify']['watermark_opacity']) +# overlay_opacity = int(CONFIG['ImageModify']['overlay_opacity']) +# +# # 1. 裁剪边缘 +# crop_px_w = int(width * crop_percent) +# crop_px_h = int(height * crop_percent) +# img = img.crop((crop_px_w, crop_px_h, width - crop_px_w, height - crop_px_h)) +# +# # 2. 随机旋转 +# angle = random.uniform(min_rotation, max_rotation) * random.choice([-1, 1]) +# img = img.rotate(angle, expand=True) +# +# # 3. 调整亮度 +# enhancer = ImageEnhance.Brightness(img) +# factor = random.uniform(min_brightness, max_brightness) # 亮度调整因子 +# img = enhancer.enhance(factor) +# +# # 4. 添加文字水印 +# draw = ImageDraw.Draw(img) +# font_size = max(20, int(min(img.size) * 0.05)) +# try: +# font = ImageFont.truetype("arial.ttf", font_size) +# except: +# font = ImageFont.load_default() +# +# # 获取文本尺寸 +# text_width, text_height = draw.textbbox((0, 0), watermark_text, font=font)[2:] +# +# # 水印放在图片右下角 +# x = img.size[0] - text_width - 5 +# y = img.size[1] - text_height - 5 +# draw.text((x, y), watermark_text, font=font, fill=(255, 255, 255, watermark_opacity)) +# +# # 5. 添加半透明蒙版 +# overlay = Image.new('RGBA', img.size, (255, 255, 255, overlay_opacity)) +# if img.mode != 'RGBA': +# img = img.convert('RGBA') +# img = Image.alpha_composite(img, overlay) +# +# return img.convert('RGB') + + +def modify_image(img): + """ + 对图片应用去重处理,不翻转,仅裁剪、旋转、亮度调整、添加水印、加透明蒙版 + 参数: + img: PIL.Image对象,要处理的图片 + 返回: + PIL.Image对象,处理后的图片 + """ + print("修改图片") + # 确保图片是RGB模式 + if img.mode != 'RGB': + img = img.convert('RGB') + # 从配置中获取参数 + config = CONFIG['ImageModify'] + crop_percent = float(config['crop_percent']) + min_rotation = float(config['min_rotation']) + max_rotation = float(config['max_rotation']) + min_brightness = float(config['min_brightness']) + max_brightness = float(config['max_brightness']) + watermark_text = config['watermark_text'] + watermark_opacity = int(config['watermark_opacity']) + overlay_opacity = int(config['overlay_opacity']) + # 1. 新增功能:裁剪图片下方20px + img = crop_bottom(img, 20) + # 2. 裁剪边缘 + img = crop_edges(img, crop_percent) + # 3. 随机旋转 + img = random_rotate(img, min_rotation, max_rotation) + # 4. 调整亮度 + img = adjust_brightness(img, min_brightness, max_brightness) + # 5. 添加文字水印 + img = add_watermark(img, watermark_text, watermark_opacity) + # 6. 添加半透明蒙版 + img = add_overlay(img, overlay_opacity) + # 返回RGB模式的图片 + return img.convert('RGB') + + +def crop_bottom(img, pixels): + """ + 裁剪图片底部指定像素 + 参数: + img: PIL.Image对象,要裁剪的图片 + pixels: int,要裁剪的像素数 + 返回: + PIL.Image对象,裁剪后的图片 + """ + width, height = img.size + if height > pixels: # 确保图片高度大于要裁剪的像素 + return img.crop((0, 0, width, height - pixels)) + return img + + +def crop_edges(img, percent): + """ + 按比例裁剪图片边缘 + 参数: + img: PIL.Image对象,要裁剪的图片 + percent: float,裁剪比例(0-1之间) + 返回: + PIL.Image对象,裁剪后的图片 + """ + width, height = img.size + crop_px_w = int(width * percent) + crop_px_h = int(height * percent) + return img.crop((crop_px_w, crop_px_h, width - crop_px_w, height - crop_px_h)) + + +def random_rotate(img, min_rotation, max_rotation): + """ + 随机旋转图片 + 参数: + img: PIL.Image对象,要旋转的图片 + min_rotation: float,最小旋转角度 + max_rotation: float,最大旋转角度 + 返回: + PIL.Image对象,旋转后的图片 + """ + angle = random.uniform(min_rotation, max_rotation) * random.choice([-1, 1]) + return img.rotate(angle, expand=True) + + +def adjust_brightness(img, min_brightness, max_brightness): + """ + 调整图片亮度 + 参数: + img: PIL.Image对象,要调整亮度的图片 + min_brightness: float,最小亮度因子 + max_brightness: float,最大亮度因子 + 返回: + PIL.Image对象,调整亮度后的图片 + """ + enhancer = ImageEnhance.Brightness(img) + factor = random.uniform(min_brightness, max_brightness) + return enhancer.enhance(factor) + + +def add_watermark(img, text, opacity): + """ + 添加文字水印到图片右下角 + 参数: + img: PIL.Image对象,要添加水印的图片 + text: str,水印文本 + opacity: int,水印透明度(0-255) + 返回: + PIL.Image对象,添加水印后的图片 + """ + # 确保图片是RGBA模式以支持透明度 + if img.mode != 'RGBA': + img = img.convert('RGBA') + draw = ImageDraw.Draw(img) + font_size = max(20, int(min(img.size) * 0.05)) + try: + font = ImageFont.truetype("arial.ttf", font_size) + except: + font = ImageFont.load_default() + # 获取文本尺寸 + text_width, text_height = draw.textbbox((0, 0), text, font=font)[2:] + # 确保水印不超出图片边界 + x = max(5, img.size[0] - text_width - 5) + y = max(5, img.size[1] - text_height - 5) + # 添加水印 + draw.text((x, y), text, font=font, fill=(255, 255, 255, opacity)) + return img + + +def add_overlay(img, opacity): + """ + 添加半透明蒙版 + 参数: + img: PIL.Image对象,要添加蒙版的图片 + opacity: int,蒙版透明度(0-255) + 返回: + PIL.Image对象,添加蒙版后的图片 + """ + # 确保图片是RGBA模式以支持透明度 + if img.mode != 'RGBA': + img = img.convert('RGBA') + overlay = Image.new('RGBA', img.size, (255, 255, 255, opacity)) + return Image.alpha_composite(img, overlay) \ No newline at end of file diff --git a/main_process.py b/main_process.py new file mode 100644 index 0000000..9ca9e31 --- /dev/null +++ b/main_process.py @@ -0,0 +1,263 @@ +import threading +import queue +import json # 导入 json 模块 + +from ai_studio import call_dify_workflow,call_coze_article_workflow,call_coze_all_article_workflow +from databases import * + +from images_edit import download_and_process_images +from utils import * +from get_web_content import * +from config import * + + +# ==============================主程序=========================== +def process_link(link_info, ai_service, current_template=None,generation_type=None): + link, article_type = link_info # 解包链接和类型信息 + try: + if link.startswith("https://www.toutiao.com"): + title_text, article_text, img_urls = toutiao_w_extract_content(link) + if title_text == "": + title_text, article_text, img_urls = toutiao_extract_content(link) + elif link.startswith("https://mp.weixin.qq.co"): + title_text, article_text, img_urls = wechat_extract_content(link) + elif link.startswith("https://www.163.com"): + title_text, article_text, img_urls = wangyi_extract_content(link) + else: + title_text, article_text, img_urls = "", "", [] + + if title_text == "": + return + elif len(title_text) > 100: + return + + # 获取数据库配置 + host = CONFIG['Database']['host'] + user = CONFIG['Database']['user'] + password = CONFIG['Database']['password'] + database = CONFIG['Database']['database'] + + # 判断文章内容是否有违禁词 + check_keywords = check_keywords_in_text(title_text) + + title = extract_content_until_punctuation(article_text).replace("正文:", "") + + from datetime import datetime + # 获取当前时间并格式化 + current_time = datetime.now().strftime("%H:%M:%S") + # 打印当前时间 + print("当前时间:", current_time) + + if ai_service == "dify": + if check_keywords: + print("文章中有违禁词!") + check_link_insert(host, user, password, database, link) + return + input_data_template_str = CONFIG['Dify'].get('input_data_template', '{"old_article": "{article_text}"}') + try: + input_data_template = json.loads(input_data_template_str) + input_data = {k: v.format(article_text=article_text) for k, v in input_data_template.items()} + except (json.JSONDecodeError, KeyError, AttributeError) as e: + logger.error(f"处理 Dify input_data 模板时出错: {e}. 使用默认模板.") + input_data = {"old_article": article_text} + message_content = call_dify_workflow(input_data) + + elif ai_service == "coze": + logger.info("coze正在处理") + logger.info(f"正在处理的文章类型为:{generation_type}") + if current_template: + original_config = { + 'workflow_id': CONFIG['Coze']['workflow_id'], + 'access_token': CONFIG['Coze']['access_token'], + 'is_async': CONFIG['Coze']['is_async'] + } + + CONFIG['Coze']['workflow_id'] = current_template.get('workflow_id', '') + CONFIG['Coze']['access_token'] = current_template.get('access_token', '') + CONFIG['Coze']['is_async'] = current_template.get('is_async', 'true') + + logger.info(f"应用模板配置: {current_template.get('name')}") + logger.info(f"Workflow ID: {CONFIG['Coze']['workflow_id']}") + logger.info(f"Access Token: {'*' * len(CONFIG['Coze']['access_token'])}") + logger.info(f"Is Async: {CONFIG['Coze']['is_async']}") + + try: + input_data_template_str = CONFIG['Coze'].get('input_data_template') + input_data_template = json.loads(input_data_template_str) + + if generation_type == "短篇": + input_data = {"article": article_text} + print("coze中输入:", input_data) + message_content = call_coze_article_workflow(input_data) + elif generation_type == "文章": + print("原文中标题为:", title_text) + print("原文中内容为:", article_text) + input_data = {"title": title_text, "article": article_text} + print("发送的请求数据为:", input_data) + title, message_content = call_coze_all_article_workflow(input_data) + finally: + if 'original_config' in locals(): + CONFIG['Coze'].update(original_config) + + # 去除标题首尾的空格 + title_text = title_text.strip() + + # 创建类型目录 + type_dir = os.path.join(ARTICLES_BASE_PATH, article_type) + safe_open_directory(type_dir) + + # 在类型目录下保存文章 + file_name = "" + if generation_type == '短篇': + file_name = handle_duplicate_files_advanced(type_dir, title_text.strip())[0] + elif generation_type == "文章": + file_name = handle_duplicate_files_advanced(type_dir, title.strip())[0] + + article_save_path = os.path.join(type_dir, f"{file_name}.txt") + + if "```" in message_content: + message_content = message_content.replace("``", "") + + message_content = title + "\n" + message_content + + # 判断文章合规度(根据配置决定是否启用) + enable_detection = CONFIG['Baidu'].get('enable_detection', 'false').lower() == 'true' + if enable_detection: + print("正在检测文章合规度") + if text_detection(message_content) == "合规": + print("文章合规") + pass + else: + print("文章不合规") + return + else: + print("违规检测已禁用,跳过检测") + + with open(article_save_path, 'w', encoding='utf-8') as f: + f.write(message_content) + logging.info('文本已经保存') + + if img_urls: + # 在类型目录下创建图片目录 + type_picture_dir = os.path.join(IMGS_BASE_PATH, article_type) + safe_open_directory(type_picture_dir) + # 确保文件名没有多余空格 + download_and_process_images(img_urls, file_name.strip(), type_picture_dir) + + except Exception as e: + logging.error(f"处理链接 {link} 时出错: {e}") + raise + + +def link_to_text(num_threads=None, ai_service="dify", current_template=None, generation_type=None): + use_link_path = 'use_link_path.txt' + + # 读取链接 + links = read_excel(TITLE_BASE_PATH) + + # 过滤已处理的链接 + filtered_links = [] + host = CONFIG['Database']['host'] + user = CONFIG['Database']['user'] + password = CONFIG['Database']['password'] + database = CONFIG['Database']['database'] + + for link_info in links: + link = link_info[0].strip() # 获取链接并去除空白字符 + # 如果Excel中有类型,使用Excel中的类型,否则使用传入的generation_type + article_type = link_info[1].strip() if len(link_info) > 1 and link_info[1].strip() else generation_type + logging.info(f"总共{len(links)}个链接") + # if check_link_exists(host, user, password, database, link): + # logger.info(f"链接已存在: {link}") + # continue + # else: + filtered_links.append((link, article_type)) # 保存链接和类型的元组 + # logger.info(f"链接不存在: {link}") + # print("链接不存在,存储到过滤器中:", link) + + if not filtered_links: + logger.info("没有新链接需要处理") + return [] + + # 使用多线程处理链接 + results = process_links_with_threads(filtered_links, num_threads, ai_service, current_template,generation_type) + + # 记录已处理的链接 + with open(use_link_path, 'a+', encoding='utf-8') as f: + for link, success, _ in results: + if success: + f.write(link + "\n") + + return results + + +# 创建一个任务队列和结果队列 +task_queue = queue.Queue() +result_queue = queue.Queue() + + +# 工作线程函数 +def worker(ai_service, current_template=None,generation_type=None): + while True: + try: + # 从队列中获取任务 + link = task_queue.get() + if link is None: # 结束信号 + break + + # 处理链接 + try: + logger.info(f"开始处理链接:{link}") + process_link(link, ai_service, current_template,generation_type) + result_queue.put((link, True, None)) # 成功 + except Exception as e: + result_queue.put((link, False, str(e))) # 失败 + logger.error(f"处理链接 {link} 时出错: {e}") + + # 标记任务完成 + task_queue.task_done() + except Exception as e: + logger.error(f"工作线程出错: {e}") + + +# 多线程处理链接 +def process_links_with_threads(links, num_threads=None, ai_service="dify", current_template=None,generation_type=None): + if num_threads is None: + num_threads = min(MAX_THREADS, len(links)) + else: + num_threads = min(num_threads, MAX_THREADS, len(links)) + + # 清空任务队列和结果队列 + while not task_queue.empty(): + task_queue.get() + while not result_queue.empty(): + result_queue.get() + + # 创建工作线程 + threads = [] + + # 将AI服务选择和模板配置传递给worker函数 + for _ in range(num_threads): + t = threading.Thread(target=worker, args=(ai_service, current_template,generation_type)) + t.daemon = True + t.start() + threads.append(t) + + # 添加任务到队列 + for link in links: + task_queue.put(link) + + # 添加结束信号 + for _ in range(num_threads): + task_queue.put(None) + + # 等待所有线程完成 + for t in threads: + t.join() + + # 处理结果 + results = [] + while not result_queue.empty(): + results.append(result_queue.get()) + + return results diff --git a/settings.json b/settings.json new file mode 100644 index 0000000..fbb6498 --- /dev/null +++ b/settings.json @@ -0,0 +1 @@ +{"folder1": "D:/work/python/ArticleReplaceBatch/articles/\u751f\u6d3b", "folder2": "D:/work/python/ArticleReplaceBatch/picture/\u751f\u6d3b", "keep_txt": true} \ No newline at end of file diff --git a/test.py b/test.py new file mode 100644 index 0000000..7076366 --- /dev/null +++ b/test.py @@ -0,0 +1,5 @@ +text = "```markdown你好的" + +if "```markdown" in text: + text = text.replace("```markdown", "") +print(text) \ No newline at end of file diff --git a/use_link_path.txt b/use_link_path.txt new file mode 100644 index 0000000..5a83b17 --- /dev/null +++ b/use_link_path.txt @@ -0,0 +1,51 @@ +https://www.toutiao.com/item/7491909097776857615/ +https://www.toutiao.com/item/7491942980174053888/ +https://www.toutiao.com/item/7491968674203533863/ +https://www.toutiao.com/item/7491961886021026340/ +https://www.toutiao.com/item/7492270583044915746/ +https://www.toutiao.com/item/7491930239560385065/ +https://www.toutiao.com/item/7492298838103966220/ +https://www.toutiao.com/item/7491909097776857615/ +https://www.toutiao.com/item/7491942980174053888/ +https://www.toutiao.com/item/7491968674203533863/ +https://www.toutiao.com/item/7491961886021026340/ +https://www.toutiao.com/item/7492270583044915746/ +https://www.toutiao.com/item/7491930239560385065/ +https://www.toutiao.com/item/7492298838103966220/ +https://www.toutiao.com/item/7491909097776857615/ +https://www.toutiao.com/item/7491942980174053888/ +https://www.toutiao.com/item/7491968674203533863/ +https://www.toutiao.com/item/7491961886021026340/ +https://www.toutiao.com/item/7492270583044915746/ +https://www.toutiao.com/item/7491930239560385065/ +https://www.toutiao.com/item/7492298838103966220/ +https://www.toutiao.com/item/7491942980174053888/ +https://www.toutiao.com/item/7491968674203533863/ +https://www.toutiao.com/item/7491930239560385065/ +https://www.toutiao.com/item/7492298838103966220/ +https://www.toutiao.com/item/7496315211876401690/ +https://www.toutiao.com/item/7496315211876401690/ +https://www.toutiao.com/item/7496315211876401690/ +https://www.toutiao.com/item/7496284554789995048/ +https://www.toutiao.com/item/7496084587592892969/ +https://www.toutiao.com/item/7495928210375377460/ +https://www.toutiao.com/item/7494707281880269324/ +https://www.toutiao.com/item/7501188656259744290/ +https://www.toutiao.com/item/7501188656259744290/ +https://www.toutiao.com/item/7501513738202169919/ +https://www.toutiao.com/item/7501459745153483301/ +https://www.toutiao.com/item/7501513738202169919/ +https://www.toutiao.com/item/7501459745153483301/ +https://www.toutiao.com/item/7501188656259744290/ +https://www.toutiao.com/item/7501188656259744290/ +https://www.toutiao.com/item/7501513738202169919/ +https://www.toutiao.com/item/7501459745153483301/ +https://www.toutiao.com/item/7501188656259744290/ +https://www.toutiao.com/item/7501513738202169919/ +https://www.toutiao.com/item/7501459745153483301/ +https://www.toutiao.com/item/7501188656259744290/ +https://www.toutiao.com/item/7501513738202169919/ +https://www.toutiao.com/item/7501459745153483301/ +https://www.toutiao.com/item/7501188656259744290/ +https://www.toutiao.com/item/7501513738202169919/ +https://www.toutiao.com/item/7501459745153483301/ diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..11282f8 --- /dev/null +++ b/utils.py @@ -0,0 +1,170 @@ +import json + +import re + +import pandas as pd +import requests +from config import * + + +def text_detection(text): + """ + 百度检验文字是否违规 + :param text: + :return: + """ + url = "https://aip.baidubce.com/rest/2.0/solution/v1/text_censor/v2/user_defined?access_token=" + get_baidu_access_token() + payload = 'text=' + text + headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + 'Accept': 'application/json' + } + + response = requests.request("POST", url, headers=headers, data=payload) + content = str(response.text) + data = json.loads(content) + print(data) + conclusion = data['conclusion'] + return conclusion + + +def get_baidu_access_token(): + """ + 使用 AK,SK 生成鉴权签名(Access Token),百度信息获取 + :return: access_token,或是None(如果错误) + """ + API_KEY = CONFIG['Baidu']['api_key'] + SECRET_KEY = CONFIG['Baidu']['secret_key'] + + url = "https://aip.baidubce.com/oauth/2.0/token" + params = {"grant_type": "client_credentials", "client_id": API_KEY, "client_secret": SECRET_KEY} + return str(requests.post(url, params=params).json().get("access_token")) + + +def safe_filename(filename): + """ + 处理文件名,移除或替换不安全的字符 + """ + # 替换Windows文件系统中不允许的字符 + invalid_chars = '<>:"/\\|?*' + for char in invalid_chars: + filename = filename.replace(char, '_') + # 去除首尾空格和点 + filename = filename.strip('. ') + # 如果文件名为空,使用默认名称 + if not filename: + filename = 'untitled' + return filename + +def safe_open_directory(directory_path): + """ + 安全创建目录,确保路径格式正确并创建所有必要的父目录 + """ + try: + # 规范化路径 + directory_path = os.path.normpath(directory_path) + if not os.path.exists(directory_path): + os.makedirs(directory_path, exist_ok=True) + os.chmod(directory_path, 0o777) + except Exception as e: + # 打印日志并保存到日志文件中 + logging.error(f"创建目录失败: {e}") + raise + + + +def check_keywords_in_text(text): + """ + 检查文本中是否包含违禁词 + :param text: + :return: + """ + keywords = CONFIG['Keywords']['banned_words'].split(',') + for keyword in keywords: + if keyword.strip() in text: + return True + return False + + +def extract_content_until_punctuation(text, punctuations=r'[,。!?;]'): + """ + 截取一段话中从开始到最近的标点符号的内容。 + + :param text: 输入的文本 + :param punctuations: 标点符号的正则表达式模式,默认为",","。","!","?",";" + :return: 截取的内容 + """ + # 使用正则表达式查找标点符号的位置 + match = re.search(punctuations, text) + + if match: + # 如果找到标点符号,截取从开始到标点符号之前的部分 + return text[:match.end()].strip() + else: + # 如果没有找到标点符号,返回整个文本 + return text.strip() + + + +# 读取Excel表格链接列和类型列的内容并将内容以元组列表的形式返回 +def read_excel(file_name): + datas = pd.read_excel(file_name) + first_column_name = datas.columns[0] # 链接列 + type_column_name = '领域' # 类型列 + + links = datas[first_column_name].tolist() + # 如果存在类型列就读取,不存在则为默认类型 + types = datas[type_column_name].tolist() if type_column_name in datas.columns else ['默认'] * len(links) + + # 将链接和类型组合成元组列表 + result = list(zip(links, types)) + print(result) + + return result + + + + +from typing import Tuple + + +def handle_duplicate_files_advanced(folder_path: str, filename: str) -> Tuple[str, bool]: + """ + 增强版:处理文件夹中的同名文件,支持更复杂的场景 + + 参数: + folder_path: 文件夹路径 + filename: 原始文件名 + + 返回: + Tuple[str, bool]: (处理后的文件名, 是否是重命名的) + """ + # 首先处理文件名中的非法字符 + filename = safe_filename(filename) + + base, ext = os.path.splitext(filename) + target_path = os.path.join(folder_path, filename) + + if not os.path.exists(target_path): + return filename, False + + existing_files = set(os.listdir(folder_path)) + pattern = re.compile(r'^{}(_(\d+))?{}$'.format(re.escape(base), re.escape(ext))) + + # 找出所有匹配的文件并提取数字 + numbers = [] + for f in existing_files: + match = pattern.match(f) + if match: + num = int(match.group(2)) if match.group(2) else 0 + numbers.append(num) + + next_num = max(numbers) + 1 if numbers else 1 + new_filename = f"{base}_{next_num}{ext}" + + # 确保新文件名也不存在(处理并发情况) + while new_filename in existing_files: + next_num += 1 + new_filename = f"{base}_{next_num}{ext}" + + return new_filename, True \ No newline at end of file