From 113c97c88799e3263e43a19085f2b9681e97ced0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=AA=E4=B8=80?= <2339117167@qq.com> Date: Tue, 6 May 2025 17:04:38 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E8=8E=B7=E5=8F=96=E7=BD=91?= =?UTF-8?q?=E9=A1=B5=E5=86=85=E5=AE=B9=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ArticleReplaceDifyBatchWTX.py | 691 ++++++++++++++++++ ArticleReplaceBatch/ai_studio.py | 50 +- ArticleReplaceBatch/images_edit.py | 7 +- ArticleReplaceBatch/main_process.py | 122 +++- ArticleReplaceBatch/main_process_wtt.py | 259 +++++++ ArticleReplaceBatch/test.py | 23 +- ArticleReplaceBatch/txt2docx.py | 340 +++++++++ ArticleReplaceBatch/utils.py | 42 ++ 8 files changed, 1489 insertions(+), 45 deletions(-) create mode 100644 ArticleReplaceBatch/ArticleReplaceDifyBatchWTX.py create mode 100644 ArticleReplaceBatch/main_process_wtt.py create mode 100644 ArticleReplaceBatch/txt2docx.py diff --git a/ArticleReplaceBatch/ArticleReplaceDifyBatchWTX.py b/ArticleReplaceBatch/ArticleReplaceDifyBatchWTX.py new file mode 100644 index 0000000..1a4ec7c --- /dev/null +++ b/ArticleReplaceBatch/ArticleReplaceDifyBatchWTX.py @@ -0,0 +1,691 @@ + +import sys # 导入sys模块 + + + + +from PIL import Image, ImageDraw, ImageFont, ImageEnhance +import time +import random + +import threading +import tkinter as tk + + +from config import * +from tkinter import ttk, messagebox, filedialog +from tkinter.scrolledtext import ScrolledText + +import pymysql + +from main_process_wtt import link_to_text, task_queue, result_queue + + + +sys.setrecursionlimit(5000) + + +class ArticleReplaceApp(tk.Tk): + def __init__(self): + super().__init__() + + self.title("文章采集与处理工具") + self.geometry("900x600") + + # 创建标签页控件 + self.notebook = ttk.Notebook(self) + self.notebook.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) + + # 创建主页面 + self.main_frame = ttk.Frame(self.notebook) + self.notebook.add(self.main_frame, text="主页面") + + # 创建配置页面 + self.config_frame = ttk.Frame(self.notebook) + self.notebook.add(self.config_frame, text="配置") + # 初始化主页面 + self.init_main_frame() + # 初始化配置页面 + self.init_config_frame() + # 初始化变量 + self.running = False + self.thread = None + self.total_links = 0 + self.processed_links = 0 + + # 设置关闭窗口事件 + self.protocol("WM_DELETE_WINDOW", self.on_close) + + def init_main_frame(self): + # 创建左侧控制面板 + control_frame = ttk.LabelFrame(self.main_frame, text="控制面板") + control_frame.pack(side=tk.LEFT, fill=tk.Y, padx=10, pady=10) + + # Excel文件选择 + ttk.Label(control_frame, text="Excel文件:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W) + self.excel_path_var = tk.StringVar(value=TITLE_BASE_PATH) + ttk.Entry(control_frame, textvariable=self.excel_path_var, width=30).grid(row=0, column=1, padx=5, pady=5) + ttk.Button(control_frame, text="浏览", command=self.browse_excel).grid(row=0, column=2, padx=5, pady=5) + + # 线程数设置 + ttk.Label(control_frame, text="线程数:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W) + self.thread_count_var = tk.StringVar(value="1") + ttk.Spinbox(control_frame, from_=1, to=MAX_THREADS, textvariable=self.thread_count_var, width=5).grid(row=1, + column=1, + padx=5, + pady=5, + sticky=tk.W) + + # AI服务提供商选择 + ttk.Label(control_frame, text="工作流选择:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W) + self.ai_service_var = tk.StringVar(value="dify") + ai_service_combo = ttk.Combobox(control_frame, textvariable=self.ai_service_var, values=["dify", "coze"], width=10, state="readonly") + ai_service_combo.grid(row=2, column=1, padx=5, pady=5, sticky=tk.W) + + # 开始按钮 + self.start_button = ttk.Button(control_frame, text="开始处理", command=self.start_processing) + self.start_button.grid(row=3, column=0, columnspan=3, padx=5, pady=20) + + # 进度条 + ttk.Label(control_frame, text="处理进度:").grid(row=4, column=0, padx=5, pady=5, sticky=tk.W) + self.progress_var = tk.DoubleVar() + ttk.Progressbar(control_frame, variable=self.progress_var, maximum=100).grid(row=4, column=1, columnspan=2, + padx=5, pady=5, sticky=tk.EW) + + # 创建右侧日志面板 + log_frame = ttk.LabelFrame(self.main_frame, text="日志") + log_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=10, pady=10) + + # 日志文本框 + self.log_text = ScrolledText(log_frame, width=70, height=30) + self.log_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) + self.log_text.config(state=tk.DISABLED) + + # 添加日志处理器 + self.log_handler = LogTextHandler(self.log_text) + self.log_handler.setLevel(logging.INFO) + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + self.log_handler.setFormatter(formatter) + logger.addHandler(self.log_handler) + + def init_config_frame(self): + # 创建配置标签页 + config_notebook = ttk.Notebook(self.config_frame) + config_notebook.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) + + # 创建各个配置页面 + general_frame = ttk.Frame(config_notebook) + database_frame = ttk.Frame(config_notebook) + dify_frame = ttk.Frame(config_notebook) + coze_frame = ttk.Frame(config_notebook) + baidu_frame = ttk.Frame(config_notebook) + image_frame = ttk.Frame(config_notebook) + keywords_frame = ttk.Frame(config_notebook) + + # 添加到标签页 + config_notebook.add(general_frame, text="常规设置") + config_notebook.add(database_frame, text="数据库设置") + config_notebook.add(dify_frame, text="Dify设置") + config_notebook.add(coze_frame, text="Coze设置") + config_notebook.add(baidu_frame, text="百度API设置") + config_notebook.add(image_frame, text="图片处理设置") + config_notebook.add(keywords_frame, text="违禁词设置") + + # 初始化各个配置页面 + self.init_general_config(general_frame) + self.init_database_config(database_frame) + self.init_dify_config(dify_frame) + self.init_coze_config(coze_frame) + self.init_baidu_config(baidu_frame) + self.init_image_config(image_frame) + self.init_keywords_config(keywords_frame) + + # 保存按钮 + save_button = ttk.Button(self.config_frame, text="保存所有配置", command=self.save_all_configs) + save_button.pack(side=tk.RIGHT, padx=10, pady=10) + + def init_general_config(self, parent): + # Chrome用户目录 + ttk.Label(parent, text="Chrome用户目录:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W) + self.chrome_dir_var = tk.StringVar(value=CONFIG['General']['chrome_user_dir']) + ttk.Entry(parent, textvariable=self.chrome_dir_var, width=50).grid(row=0, column=1, padx=5, pady=5) + ttk.Button(parent, text="浏览", command=lambda: self.browse_directory(self.chrome_dir_var)).grid(row=0, + column=2, + padx=5, pady=5) + + # 文章保存路径 + ttk.Label(parent, text="文章保存路径:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W) + self.articles_path_var = tk.StringVar(value=CONFIG['General']['articles_path']) + ttk.Entry(parent, textvariable=self.articles_path_var, width=50).grid(row=1, column=1, padx=5, pady=5) + ttk.Button(parent, text="浏览", command=lambda: self.browse_directory(self.articles_path_var)).grid(row=1, + column=2, + padx=5, + pady=5) + + # 图片保存路径 + ttk.Label(parent, text="图片保存路径:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W) + self.images_path_var = tk.StringVar(value=CONFIG['General']['images_path']) + ttk.Entry(parent, textvariable=self.images_path_var, width=50).grid(row=2, column=1, padx=5, pady=5) + ttk.Button(parent, text="浏览", command=lambda: self.browse_directory(self.images_path_var)).grid(row=2, + column=2, + padx=5, + pady=5) + + # Excel文件路径 + ttk.Label(parent, text="默认Excel文件:").grid(row=3, column=0, padx=5, pady=5, sticky=tk.W) + self.excel_file_var = tk.StringVar(value=CONFIG['General']['title_file']) + ttk.Entry(parent, textvariable=self.excel_file_var, width=50).grid(row=3, column=1, padx=5, pady=5) + ttk.Button(parent, text="浏览", command=lambda: self.browse_file(self.excel_file_var, [("Excel文件", "*.xlsx"), + ("所有文件", + "*.*")])).grid(row=3, + column=2, + padx=5, + pady=5) + + # 最大线程数 + ttk.Label(parent, text="最大线程数:").grid(row=4, column=0, padx=5, pady=5, sticky=tk.W) + self.max_threads_var = tk.StringVar(value=CONFIG['General']['max_threads']) + ttk.Spinbox(parent, from_=1, to=10, textvariable=self.max_threads_var, width=5).grid(row=4, column=1, padx=5, + pady=5, sticky=tk.W) + + def init_database_config(self, parent): + # 数据库主机 + ttk.Label(parent, text="数据库主机:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W) + self.db_host_var = tk.StringVar(value=CONFIG['Database']['host']) + ttk.Entry(parent, textvariable=self.db_host_var, width=30).grid(row=0, column=1, padx=5, pady=5) + + # 数据库用户名 + ttk.Label(parent, text="数据库用户名:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W) + self.db_user_var = tk.StringVar(value=CONFIG['Database']['user']) + ttk.Entry(parent, textvariable=self.db_user_var, width=30).grid(row=1, column=1, padx=5, pady=5) + + # 数据库密码 + ttk.Label(parent, text="数据库密码:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W) + self.db_password_var = tk.StringVar(value=CONFIG['Database']['password']) + ttk.Entry(parent, textvariable=self.db_password_var, width=30, show="*").grid(row=2, column=1, padx=5, pady=5) + + # 数据库名称 + ttk.Label(parent, text="数据库名称:").grid(row=3, column=0, padx=5, pady=5, sticky=tk.W) + self.db_name_var = tk.StringVar(value=CONFIG['Database']['database']) + ttk.Entry(parent, textvariable=self.db_name_var, width=30).grid(row=3, column=1, padx=5, pady=5) + + # 测试连接按钮 + ttk.Button(parent, text="测试连接", command=self.test_db_connection).grid(row=4, column=1, padx=5, pady=10, + sticky=tk.E) + + def init_dify_config(self, parent): + # Dify API Key + ttk.Label(parent, text="API Key:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W) + self.dify_api_key_var = tk.StringVar(value=CONFIG['Dify']['api_key']) + ttk.Entry(parent, textvariable=self.dify_api_key_var, width=50).grid(row=0, column=1, padx=5, pady=5) + + # Dify User ID + ttk.Label(parent, text="User ID:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W) + self.dify_user_id_var = tk.StringVar(value=CONFIG['Dify']['user_id']) + ttk.Entry(parent, textvariable=self.dify_user_id_var, width=30).grid(row=1, column=1, padx=5, pady=5) + + # Dify URL + ttk.Label(parent, text="URL:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W) + self.dify_url_var = tk.StringVar(value=CONFIG['Dify']['url']) + ttk.Entry(parent, textvariable=self.dify_url_var, width=50).grid(row=2, column=1, padx=5, pady=5) + + # Dify Input Data Template + ttk.Label(parent, text="Input Data模板:").grid(row=3, column=0, padx=5, pady=5, sticky=tk.W) + self.dify_input_data_template_var = tk.StringVar(value=CONFIG['Dify'].get('input_data_template', '{"old_article": "{article_text}"}')) # 添加默认值 + ttk.Entry(parent, textvariable=self.dify_input_data_template_var, width=50).grid(row=3, column=1, padx=5, pady=5) + + def init_coze_config(self, parent): + # Coze Workflow ID + ttk.Label(parent, text="Workflow ID:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W) + self.coze_workflow_id_var = tk.StringVar(value=CONFIG['Coze']['workflow_id']) + ttk.Entry(parent, textvariable=self.coze_workflow_id_var, width=50).grid(row=0, column=1, padx=5, pady=5) + + # Coze Access Token + ttk.Label(parent, text="Access Token:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W) + self.coze_access_token_var = tk.StringVar(value=CONFIG['Coze']['access_token']) + ttk.Entry(parent, textvariable=self.coze_access_token_var, width=50).grid(row=1, column=1, padx=5, pady=5) + + # Coze Is Async + ttk.Label(parent, text="Is Async:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W) + self.coze_is_async_var = tk.StringVar(value=CONFIG['Coze']['is_async']) + ttk.Combobox(parent, textvariable=self.coze_is_async_var, values=["true", "false"], width=10, state="readonly").grid(row=2, column=1, padx=5, pady=5, sticky=tk.W) + + # Coze Input Data Template + ttk.Label(parent, text="Input Data模板:").grid(row=3, column=0, padx=5, pady=5, sticky=tk.W) + self.coze_input_data_template_var = tk.StringVar(value=CONFIG['Coze'].get('input_data_template', '{{"article": "{article_text}", "link":"{link}", "weijin":"{weijin}"}}')) # 添加默认值 + ttk.Entry(parent, textvariable=self.coze_input_data_template_var, width=50).grid(row=3, column=1, padx=5, pady=5) + + def init_baidu_config(self, parent): + # 百度 API Key + ttk.Label(parent, text="API Key:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W) + self.baidu_api_key_var = tk.StringVar(value=CONFIG['Baidu']['api_key']) + ttk.Entry(parent, textvariable=self.baidu_api_key_var, width=50).grid(row=0, column=1, padx=5, pady=5) + + # 百度 Secret Key + ttk.Label(parent, text="Secret Key:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W) + self.baidu_secret_key_var = tk.StringVar(value=CONFIG['Baidu']['secret_key']) + ttk.Entry(parent, textvariable=self.baidu_secret_key_var, width=50).grid(row=1, column=1, padx=5, pady=5) + + def init_image_config(self, parent): + # 裁剪百分比 + ttk.Label(parent, text="裁剪百分比:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W) + self.crop_percent_var = tk.StringVar(value=CONFIG['ImageModify']['crop_percent']) + ttk.Entry(parent, textvariable=self.crop_percent_var, width=10).grid(row=0, column=1, padx=5, pady=5, + sticky=tk.W) + + # 最小旋转角度 + ttk.Label(parent, text="最小旋转角度:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W) + self.min_rotation_var = tk.StringVar(value=CONFIG['ImageModify']['min_rotation']) + ttk.Entry(parent, textvariable=self.min_rotation_var, width=10).grid(row=1, column=1, padx=5, pady=5, + sticky=tk.W) + + # 最大旋转角度 + ttk.Label(parent, text="最大旋转角度:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W) + self.max_rotation_var = tk.StringVar(value=CONFIG['ImageModify']['max_rotation']) + ttk.Entry(parent, textvariable=self.max_rotation_var, width=10).grid(row=2, column=1, padx=5, pady=5, + sticky=tk.W) + + # 最小亮度 + ttk.Label(parent, text="最小亮度:").grid(row=3, column=0, padx=5, pady=5, sticky=tk.W) + self.min_brightness_var = tk.StringVar(value=CONFIG['ImageModify']['min_brightness']) + ttk.Entry(parent, textvariable=self.min_brightness_var, width=10).grid(row=3, column=1, padx=5, pady=5, + sticky=tk.W) + + # 最大亮度 + ttk.Label(parent, text="最大亮度:").grid(row=4, column=0, padx=5, pady=5, sticky=tk.W) + self.max_brightness_var = tk.StringVar(value=CONFIG['ImageModify']['max_brightness']) + ttk.Entry(parent, textvariable=self.max_brightness_var, width=10).grid(row=4, column=1, padx=5, pady=5, + sticky=tk.W) + + # 水印文字 + ttk.Label(parent, text="水印文字:").grid(row=0, column=2, padx=5, pady=5, sticky=tk.W) + self.watermark_text_var = tk.StringVar(value=CONFIG['ImageModify']['watermark_text']) + ttk.Entry(parent, textvariable=self.watermark_text_var, width=30).grid(row=0, column=3, padx=5, pady=5) + + # 水印透明度 + ttk.Label(parent, text="水印透明度:").grid(row=1, column=2, padx=5, pady=5, sticky=tk.W) + self.watermark_opacity_var = tk.StringVar(value=CONFIG['ImageModify']['watermark_opacity']) + ttk.Entry(parent, textvariable=self.watermark_opacity_var, width=10).grid(row=1, column=3, padx=5, pady=5, + sticky=tk.W) + + # 蒙版透明度 + ttk.Label(parent, text="蒙版透明度:").grid(row=2, column=2, padx=5, pady=5, sticky=tk.W) + self.overlay_opacity_var = tk.StringVar(value=CONFIG['ImageModify']['overlay_opacity']) + ttk.Entry(parent, textvariable=self.overlay_opacity_var, width=10).grid(row=2, column=3, padx=5, pady=5, + sticky=tk.W) + + # 预览按钮 + ttk.Button(parent, text="预览效果", command=self.preview_image_effect).grid(row=4, column=3, padx=5, pady=5, + sticky=tk.E) + + def init_keywords_config(self, parent): + # 违禁词列表 + ttk.Label(parent, text="违禁词列表:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W) + self.banned_words_text = ScrolledText(parent, width=60, height=15) + self.banned_words_text.grid(row=1, column=0, columnspan=2, padx=5, pady=5, sticky=tk.NSEW) + self.banned_words_text.insert(tk.END, CONFIG['Keywords']['banned_words'].replace(',', '\n')) + + # 保存按钮 + ttk.Button(parent, text="保存违禁词", command=self.save_banned_words).grid(row=2, column=1, padx=5, pady=5, + sticky=tk.E) + + # 配置行列权重 + parent.columnconfigure(0, weight=1) + parent.rowconfigure(1, weight=1) + + def save_banned_words(self): + # 处理文本,将换行符替换为逗号 + words = self.banned_words_text.get(1.0, tk.END).strip().replace('\n', ',') + CONFIG['Keywords']['banned_words'] = words + messagebox.showinfo("保存成功", "违禁词列表已更新") + + def browse_directory(self, var): + directory = filedialog.askdirectory() + if directory: + var.set(directory) + + def browse_file(self, var, filetypes): + file_path = filedialog.askopenfilename(filetypes=filetypes) + if file_path: + var.set(file_path) + + def browse_excel(self): + file_path = filedialog.askopenfilename(filetypes=[("Excel文件", "*.xlsx"), ("所有文件", "*.*")]) + if file_path: + self.excel_path_var.set(file_path) + + def test_db_connection(self): + try: + host = self.db_host_var.get() + user = self.db_user_var.get() + password = self.db_password_var.get() + database = self.db_name_var.get() + + connection = pymysql.connect( + host=host, + user=user, + password=password, + database=database + ) + connection.close() + messagebox.showinfo("连接成功", "数据库连接测试成功!") + except Exception as e: + messagebox.showerror("连接失败", f"数据库连接测试失败:{e}") + + def preview_image_effect(self): + try: + # 创建一个示例图片 + img = Image.new('RGB', (400, 300), color=(240, 240, 240)) + draw = ImageDraw.Draw(img) + draw.rectangle([50, 50, 350, 250], fill=(200, 200, 200)) + draw.text((150, 140), "示例图片", fill=(0, 0, 0)) + + # 应用图片修改效果 + modified_img = self.apply_image_modifications(img) + + # 显示修改后的图片 + self.show_preview_image(modified_img) + except Exception as e: + messagebox.showerror("预览失败", f"生成预览图片时出错:{e}") + + def apply_image_modifications(self, img): + """应用当前配置的图片修改效果""" + width, height = img.size + + try: + # 从界面获取参数 + crop_percent = float(self.crop_percent_var.get()) + min_rotation = float(self.min_rotation_var.get()) + max_rotation = float(self.max_rotation_var.get()) + min_brightness = float(self.min_brightness_var.get()) + max_brightness = float(self.max_brightness_var.get()) + watermark_text = self.watermark_text_var.get() + watermark_opacity = int(self.watermark_opacity_var.get()) + overlay_opacity = int(self.overlay_opacity_var.get()) + + # 1. 裁剪边缘 + crop_px_w = int(width * crop_percent) + crop_px_h = int(height * crop_percent) + img = img.crop((crop_px_w, crop_px_h, width - crop_px_w, height - crop_px_h)) + + # 2. 随机旋转 + angle = random.uniform(min_rotation, max_rotation) * random.choice([-1, 1]) + img = img.rotate(angle, expand=True) + + # 3. 调整亮度 + enhancer = ImageEnhance.Brightness(img) + factor = random.uniform(min_brightness, max_brightness) + img = enhancer.enhance(factor) + + # 4. 添加文字水印 + draw = ImageDraw.Draw(img) + font_size = max(20, int(min(img.size) * 0.05)) + try: + font = ImageFont.truetype("arial.ttf", font_size) + except: + font = ImageFont.load_default() + + # 获取文本尺寸 + text_width, text_height = draw.textbbox((0, 0), watermark_text, font=font)[2:] + + # 水印放在图片右下角 + x = img.size[0] - text_width - 5 + y = img.size[1] - text_height - 5 + draw.text((x, y), watermark_text, font=font, fill=(255, 255, 255, watermark_opacity)) + + # 5. 添加半透明蒙版 + overlay = Image.new('RGBA', img.size, (255, 255, 255, overlay_opacity)) + if img.mode != 'RGBA': + img = img.convert('RGBA') + img = Image.alpha_composite(img, overlay) + + return img.convert('RGB') + except Exception as e: + messagebox.showerror("参数错误", f"应用图片修改时出错:{e}") + return img + + def show_preview_image(self, img): + """显示预览图片""" + preview_window = tk.Toplevel(self) + preview_window.title("图片效果预览") + preview_window.geometry("500x400") + + # 将PIL图像转换为Tkinter可用的格式 + from PIL import ImageTk + tk_img = ImageTk.PhotoImage(img) + + # 显示图片 + label = tk.Label(preview_window, image=tk_img) + label.image = tk_img # 保持引用 + label.pack(padx=10, pady=10) + + # 关闭按钮 + ttk.Button(preview_window, text="关闭", command=preview_window.destroy).pack(pady=10) + + def save_all_configs(self): + """保存所有配置到配置文件""" + try: + # 更新General配置 + CONFIG['General']['chrome_user_dir'] = self.chrome_dir_var.get() + CONFIG['General']['articles_path'] = self.articles_path_var.get() + CONFIG['General']['images_path'] = self.images_path_var.get() + CONFIG['General']['title_file'] = self.excel_file_var.get() + CONFIG['General']['max_threads'] = self.max_threads_var.get() + + # 更新Database配置 + CONFIG['Database']['host'] = self.db_host_var.get() + CONFIG['Database']['user'] = self.db_user_var.get() + CONFIG['Database']['password'] = self.db_password_var.get() + CONFIG['Database']['database'] = self.db_name_var.get() + + # 更新Dify配置 + CONFIG['Dify']['api_key'] = self.dify_api_key_var.get() + CONFIG['Dify']['user_id'] = self.dify_user_id_var.get() + CONFIG['Dify']['url'] = self.dify_url_var.get() + CONFIG['Dify']['input_data_template'] = self.dify_input_data_template_var.get() # 保存新字段 + + # 更新Coze配置 + CONFIG['Coze']['workflow_id'] = self.coze_workflow_id_var.get() + CONFIG['Coze']['access_token'] = self.coze_access_token_var.get() + CONFIG['Coze']['is_async'] = self.coze_is_async_var.get() + CONFIG['Coze']['input_data_template'] = self.coze_input_data_template_var.get() # 保存 Coze input data 模板 + + # 更新Baidu配置 + CONFIG['Baidu']['api_key'] = self.baidu_api_key_var.get() + CONFIG['Baidu']['secret_key'] = self.baidu_secret_key_var.get() + + # 更新ImageModify配置 + CONFIG['ImageModify']['crop_percent'] = self.crop_percent_var.get() + CONFIG['ImageModify']['min_rotation'] = self.min_rotation_var.get() + CONFIG['ImageModify']['max_rotation'] = self.max_rotation_var.get() + CONFIG['ImageModify']['min_brightness'] = self.min_brightness_var.get() + CONFIG['ImageModify']['max_brightness'] = self.max_brightness_var.get() + CONFIG['ImageModify']['watermark_text'] = self.watermark_text_var.get() + CONFIG['ImageModify']['watermark_opacity'] = self.watermark_opacity_var.get() + CONFIG['ImageModify']['overlay_opacity'] = self.overlay_opacity_var.get() + + # 保存配置到文件 + save_config(CONFIG) + + # 更新全局变量 + global USER_DIR_PATH, ARTICLES_BASE_PATH, IMGS_BASE_PATH, TITLE_BASE_PATH, MAX_THREADS + USER_DIR_PATH = CONFIG['General']['chrome_user_dir'] + ARTICLES_BASE_PATH = CONFIG['General']['articles_path'] + IMGS_BASE_PATH = CONFIG['General']['images_path'] + TITLE_BASE_PATH = CONFIG['General']['title_file'] + MAX_THREADS = int(CONFIG['General']['max_threads']) + + # 创建必要的目录 + if not os.path.exists(ARTICLES_BASE_PATH): + os.makedirs(ARTICLES_BASE_PATH) + if not os.path.exists(IMGS_BASE_PATH): + os.makedirs(IMGS_BASE_PATH) + + messagebox.showinfo("保存成功", "所有配置已保存") + except Exception as e: + messagebox.showerror("保存失败", f"保存配置时出错:{e}") + + def start_processing(self): + """开始处理链接""" + if self.running: + messagebox.showinfo("处理中", "已有任务正在处理中,请等待完成") + return + + try: + # 更新Excel文件路径 + excel_path = self.excel_path_var.get() + if not os.path.exists(excel_path): + messagebox.showerror("文件错误", f"Excel文件不存在:{excel_path}") + return + + # 获取线程数 + try: + num_threads = int(self.thread_count_var.get()) + if num_threads < 1: + num_threads = 1 + elif num_threads > MAX_THREADS: + num_threads = MAX_THREADS + except: + num_threads = 1 + + # 禁用开始按钮 + self.start_button.config(state=tk.DISABLED) + self.running = True + + # 清空日志 + self.log_text.config(state=tk.NORMAL) + self.log_text.delete(1.0, tk.END) + self.log_text.config(state=tk.DISABLED) + + # 获取AI服务提供商选择 + ai_service = self.ai_service_var.get() + + # 在新线程中运行处理任务 + self.thread = threading.Thread(target=self.run_processing, args=(excel_path, num_threads, ai_service)) + self.thread.daemon = True + self.thread.start() + + # 启动进度更新 + self.after(100, self.update_progress) + except Exception as e: + messagebox.showerror("启动失败", f"启动处理任务时出错:{e}") + self.start_button.config(state=tk.NORMAL) + self.running = False + + def run_processing(self, excel_path, num_threads, ai_service): + """在后台线程中运行处理任务""" + try: + # 更新全局变量 + global TITLE_BASE_PATH + TITLE_BASE_PATH = excel_path + + # 记录开始时间 + start_time = time.time() + + # 读取链接并处理 + logger.info(f"开始处理链接,使用 {num_threads} 个线程") + results = link_to_text(num_threads=num_threads, ai_service=ai_service) + + # 计算处理结果 + total_links = len(results) + success_links = sum(1 for _, success, _ in results if success) + + # 记录结束时间和总耗时 + end_time = time.time() + elapsed_time = end_time - start_time + + # 记录处理结果 + logger.info( + f"处理完成,共处理 {total_links} 个链接,成功 {success_links} 个,失败 {total_links - success_links} 个") + logger.info(f"总耗时: {elapsed_time:.2f} 秒") + + # 在主线程中显示处理结果 + self.after(0, lambda: messagebox.showinfo("处理完成", + f"共处理 {total_links} 个链接\n成功: {success_links} 个\n失败: {total_links - success_links} 个\n总耗时: {elapsed_time:.2f} 秒")) + except Exception as e: + logger.error(f"处理任务出错: {e}") + self.after(0, lambda: messagebox.showerror("处理失败", f"处理任务出错:{e}")) + finally: + # 恢复开始按钮状态 + self.after(0, lambda: self.start_button.config(state=tk.NORMAL)) + self.running = False + + def update_progress(self): + """更新进度条和状态""" + if not self.running: + return + + try: + # 获取当前进度 + total = task_queue.qsize() + result_queue.qsize() + done = result_queue.qsize() + + if total > 0: + # 更新进度条 + progress = (done / total) * 100 + self.progress_var.set(progress) + + # 更新标题显示进度 + self.title(f"文章采集与处理工具 - 进度: {progress:.1f}%") + + # 继续更新 + self.after(500, self.update_progress) + except Exception as e: + logger.error(f"更新进度出错: {e}") + + def on_close(self): + """关闭窗口时的处理""" + if self.running: + if messagebox.askyesno("确认退出", "任务正在处理中,确定要退出吗?"): + self.destroy() + else: + self.destroy() + + + +# 日志处理器类,用于将日志输出到文本框 +class LogTextHandler(logging.Handler): + def __init__(self, text_widget): + logging.Handler.__init__(self) + self.text_widget = text_widget + + def emit(self, record): + msg = self.format(record) + + def append(): + self.text_widget.configure(state=tk.NORMAL) + self.text_widget.insert(tk.END, msg + '\n') + self.text_widget.see(tk.END) # 自动滚动到底部 + self.text_widget.configure(state=tk.DISABLED) + + # 在主线程中更新UI + self.text_widget.after(0, append) + + + + +# 主函数 +def main(): + # 初始化日志 + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler("article_replace.log", encoding='utf-8'), + logging.StreamHandler() + ] + ) + + # 创建必要的目录 + if not os.path.exists(ARTICLES_BASE_PATH): + os.makedirs(ARTICLES_BASE_PATH) + if not os.path.exists(IMGS_BASE_PATH): + os.makedirs(IMGS_BASE_PATH) + + # 启动GUI应用 + app = ArticleReplaceApp() + app.mainloop() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/ArticleReplaceBatch/ai_studio.py b/ArticleReplaceBatch/ai_studio.py index 2083bb7..cf6a2b9 100644 --- a/ArticleReplaceBatch/ai_studio.py +++ b/ArticleReplaceBatch/ai_studio.py @@ -37,7 +37,6 @@ def call_dify_workflow(input_data): return article - # ==========================调用coze工作流========================== @@ -54,17 +53,18 @@ def call_coze_workflow(parameters): is_async = CONFIG['Coze']['is_async'].lower() == 'true' url = "https://api.coze.cn/v1/workflow/run" + headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } + data = { "workflow_id": workflow_id, "parameters": parameters, "is_async": is_async } - response = requests.post(url, json=data, headers=headers) if response.status_code == 200: @@ -78,3 +78,49 @@ def call_coze_workflow(parameters): "detail": response.text } + +def call_coze_article_workflow(parameters): + """ + 调用 Coze 工作流的函数 + + :param parameters: 传递给工作流的输入参数(字典格式) + :param is_async: 是否异步执行(默认 False) + :return: 工作流的执行结果 + """ + + workflow_id = CONFIG['Coze']['workflow_id'] + access_token = CONFIG['Coze']['access_token'] + is_async = CONFIG['Coze']['is_async'].lower() == 'true' + url = "https://api.coze.cn/v1/workflow/run" + headers = { + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json" + } + data = { + "workflow_id": workflow_id, + "parameters": parameters, + "is_async": is_async + } + + response = requests.post(url, json=data, headers=headers) + + if response.status_code == 200: + # data = json.loads(response.text)['data'] + # print("data:",data['output']) + import ast + + # 直接解析整个result字符串 + result_dict = ast.literal_eval(response.text) + + # 解析data字段 + data_dict = ast.literal_eval(result_dict['data']) + + # 获取output的值 + output_value = data_dict['output'] + + return output_value + else: + return { + "error": f"请求失败,状态码:{response.status_code}", + "detail": response.text + } diff --git a/ArticleReplaceBatch/images_edit.py b/ArticleReplaceBatch/images_edit.py index 570cdc4..0e4213d 100644 --- a/ArticleReplaceBatch/images_edit.py +++ b/ArticleReplaceBatch/images_edit.py @@ -10,6 +10,8 @@ from config import * from utils import safe_open_directory IMGS_BASE_PATH = CONFIG['General']['images_path'] + + def crop_and_replace_images(folder_path): """ 修改图片尺寸 @@ -89,7 +91,10 @@ def download_and_process_images(img_urls, article_title): safe_open_directory(img_dir_path) for i, img_url in enumerate(img_urls): - imgurl = "https:" + img_url + if img_url.startswith("https"): + imgurl = img_url + else: + imgurl = "https:"+img_url img_path = os.path.join(img_dir_path, f"图片{i}.jpg") try: download_image(imgurl, img_path) diff --git a/ArticleReplaceBatch/main_process.py b/ArticleReplaceBatch/main_process.py index 3228d48..3316d3f 100644 --- a/ArticleReplaceBatch/main_process.py +++ b/ArticleReplaceBatch/main_process.py @@ -1,8 +1,8 @@ import threading import queue -import json # 导入 json 模块 -from ai_studio import call_dify_workflow, call_coze_workflow + +from ai_studio import call_dify_workflow, call_coze_workflow,call_coze_article_workflow from databases import * from images_edit import download_and_process_images @@ -20,9 +20,7 @@ def process_link(link, ai_service): """ try: if link.startswith("https://www.toutiao.com"): - title_text, article_text, img_urls = toutiao_w_extract_content(link) - if title_text == "": - title_text, article_text, img_urls = toutiao_extract_content(link) + title_text, article_text, img_urls = toutiao_extract_content(link) elif link.startswith("https://mp.weixin.qq.co"): title_text, article_text, img_urls = wechat_extract_content(link) else: @@ -44,8 +42,7 @@ def process_link(link, ai_service): title = extract_content_until_punctuation(article_text).replace("正文:", "") - print(title) - print(article_text) + logger.info(img_urls) from datetime import datetime @@ -54,6 +51,7 @@ def process_link(link, ai_service): # 打印当前时间 print("当前时间:", current_time) + logger.info(title_text) if ai_service == "dify": if check_keywords: @@ -61,7 +59,7 @@ def process_link(link, ai_service): check_link_insert(host, user, password, database, link) return # 从配置加载 input_data 模板 - input_data_template_str = CONFIG['Dify'].get('input_data_template', '{{"old_article": "{article_text}"}}') + input_data_template_str = CONFIG['Dify'].get('input_data_template', '{{"title": "{article_text}"}}') try: # 解析模板字符串为字典 input_data_template = json.loads(input_data_template_str) @@ -83,50 +81,53 @@ def process_link(link, ai_service): if check_keywords: weijin = "违禁" # 从配置加载 Coze input_data 模板 - input_data_template_str = CONFIG['Coze'].get('input_data_template', - '{{"article": "{article_text}", "link":"{link}", "weijin":"{weijin}"}}') - try: - # 解析模板字符串为字典 - input_data_template = json.loads(input_data_template_str) - # 使用实际变量格式化模板 - input_data = {k: v.format(article_text=article_text, link=link, weijin=weijin) for k, v in - input_data_template.items()} - except (json.JSONDecodeError, KeyError, AttributeError) as e: - logger.error(f"处理 Coze input_data 模板时出错: {e}. 使用默认模板.") - input_data = { - "article": article_text, - "link": link, - "weijin": weijin - } + # input_data_template_str = CONFIG['Coze'].get('input_data_template', + # f'{{"title": "{title_text}"}}') + # try: + # # 解析模板字符串为字典 + # input_data_template = json.loads(input_data_template_str) + # # 使用实际变量格式化模板 + # input_data = {k: v.format(article_text=article_text) for k, v in + # input_data_template.items()} + # except (json.JSONDecodeError, KeyError, AttributeError) as e: + # logger.error(f"处理 Coze input_data 模板时出错: {e}. 使用默认模板.") + # input_data = { + # "title": title_text + # + # } + + input_data = { + "title": title_text + + } + message_content = call_coze_article_workflow(input_data) + # message_content = msg['result'] - msg = call_coze_workflow(input_data) - message_content = msg['article'] - result = msg['result'] - if result == "已经创作过": - return # 获取当前时间并格式化 current_time = datetime.now().strftime("%H:%M:%S") # 打印当前时间 print("当前时间:", current_time) - finally_article = message_content.replace("正文:", "") + "\n" + # finally_article = message_content.replace("正文:", "") + "\n" - article_save_path = os.path.join(ARTICLES_BASE_PATH, f"{title}.txt") + file_name = handle_duplicate_files_advanced(ARTICLES_BASE_PATH,title_text) - if '*' in finally_article or '#' in finally_article or "-" in finally_article: + article_save_path = os.path.join(ARTICLES_BASE_PATH, f"{file_name}.txt") + + if '*' in message_content or '#' in message_content or "-" in message_content: # 使用正则表达式一次性替换多个字符 old_content = re.sub(r'[*#-]', '', message_content) else: # 如果不需要替换,直接使用原内容 - old_content = finally_article + old_content = message_content print("改写完成的文章:" + old_content) # 删除AI词汇 content = old_content - check_link_insert(host, user, password, database, link) + # check_link_insert(host, user, password, database, link) # 判断文章合规度 if text_detection(content) == "合规": @@ -141,14 +142,65 @@ def process_link(link, ai_service): logging.info('文本已经保存') if img_urls: - download_and_process_images(img_urls, title) + download_and_process_images(img_urls, file_name) except Exception as e: logging.error(f"处理链接 {link} 时出错: {e}") raise -def link_to_text(prompt1=None, prompt2=None, num_threads=None, ai_service="dify"): +def link_to_text(num_threads=None, ai_service="dify"): + use_link_path = 'use_link_path.txt' + + # 读取链接 + links = read_excel(TITLE_BASE_PATH) + + # 过滤已处理的链接 + filtered_links = [] + host = CONFIG['Database']['host'] + user = CONFIG['Database']['user'] + password = CONFIG['Database']['password'] + database = CONFIG['Database']['database'] + + # for link in links: + # logging.info(f"总共{len(links)}个链接") + # if check_link_exists(host, user, password, database, link): + # logger.info(f"链接已存在: {link}") + # continue + # else: + # filtered_links.append(link) + # logger.info(f"链接不存在: {link}") + # print("链接不存在,存储到过滤器中:", link) + for link in links: + logging.info(f"总共{len(links)}个链接") + filtered_links.append(link) + + # if check_link_exists(host, user, password, database, link): + # logger.info(f"链接已存在: {link}") + # continue + # else: + # filtered_links.append(link) + # logger.info(f"链接不存在: {link}") + # print("链接不存在,存储到过滤器中:", link) + + if not filtered_links: + logger.info("没有新链接需要处理") + return [] + + # 使用多线程处理链接 + results = process_links_with_threads(filtered_links, num_threads, ai_service) + + # 记录已处理的链接 + with open(use_link_path, 'a+', encoding='utf-8') as f: + for link, success, _ in results: + if success: + f.write(link + "\n") + + return results + + + +def link_to_mysql_text(prompt1=None, prompt2=None, num_threads=None, ai_service="dify"): use_link_path = 'use_link_path.txt' # 读取链接 diff --git a/ArticleReplaceBatch/main_process_wtt.py b/ArticleReplaceBatch/main_process_wtt.py new file mode 100644 index 0000000..3228d48 --- /dev/null +++ b/ArticleReplaceBatch/main_process_wtt.py @@ -0,0 +1,259 @@ +import threading +import queue +import json # 导入 json 模块 + +from ai_studio import call_dify_workflow, call_coze_workflow +from databases import * + +from images_edit import download_and_process_images +from utils import * +from get_web_content import * +from config import * + + +# ==============================主程序=========================== +def process_link(link, ai_service): + """ + 处理单个链接 + :param link: 要处理的链接 + :param ai_service: AI服务提供商,可选值:dify, coze + """ + try: + if link.startswith("https://www.toutiao.com"): + title_text, article_text, img_urls = toutiao_w_extract_content(link) + if title_text == "": + title_text, article_text, img_urls = toutiao_extract_content(link) + elif link.startswith("https://mp.weixin.qq.co"): + title_text, article_text, img_urls = wechat_extract_content(link) + else: + title_text, article_text, img_urls = "", "", [] + + if title_text == "": + return + elif len(title_text) > 100: + return + + # 获取数据库配置 + host = CONFIG['Database']['host'] + user = CONFIG['Database']['user'] + password = CONFIG['Database']['password'] + database = CONFIG['Database']['database'] + + # 判断文章内容是否有违禁词 + check_keywords = check_keywords_in_text(title_text) + + title = extract_content_until_punctuation(article_text).replace("正文:", "") + + print(title) + print(article_text) + + from datetime import datetime + + # 获取当前时间并格式化 + current_time = datetime.now().strftime("%H:%M:%S") + + # 打印当前时间 + print("当前时间:", current_time) + + if ai_service == "dify": + if check_keywords: + print("文章中有违禁词!") + check_link_insert(host, user, password, database, link) + return + # 从配置加载 input_data 模板 + input_data_template_str = CONFIG['Dify'].get('input_data_template', '{{"old_article": "{article_text}"}}') + try: + # 解析模板字符串为字典 + input_data_template = json.loads(input_data_template_str) + # 使用实际变量格式化模板 + input_data = {k: v.format(article_text=article_text) for k, v in input_data_template.items()} + except (json.JSONDecodeError, KeyError, AttributeError) as e: + logger.error(f"处理 Dify input_data 模板时出错: {e}. 使用默认模板.") + input_data = { + "old_article": article_text + } + + # input_data = { + # "old_article": article_text + # } + message_content = call_dify_workflow(input_data) + elif ai_service == "coze": + logger.info("coze正在处理") + weijin = "" + if check_keywords: + weijin = "违禁" + # 从配置加载 Coze input_data 模板 + input_data_template_str = CONFIG['Coze'].get('input_data_template', + '{{"article": "{article_text}", "link":"{link}", "weijin":"{weijin}"}}') + try: + # 解析模板字符串为字典 + input_data_template = json.loads(input_data_template_str) + # 使用实际变量格式化模板 + input_data = {k: v.format(article_text=article_text, link=link, weijin=weijin) for k, v in + input_data_template.items()} + except (json.JSONDecodeError, KeyError, AttributeError) as e: + logger.error(f"处理 Coze input_data 模板时出错: {e}. 使用默认模板.") + input_data = { + "article": article_text, + "link": link, + "weijin": weijin + } + + msg = call_coze_workflow(input_data) + message_content = msg['article'] + result = msg['result'] + if result == "已经创作过": + return + # 获取当前时间并格式化 + current_time = datetime.now().strftime("%H:%M:%S") + + # 打印当前时间 + print("当前时间:", current_time) + + finally_article = message_content.replace("正文:", "") + "\n" + + article_save_path = os.path.join(ARTICLES_BASE_PATH, f"{title}.txt") + + if '*' in finally_article or '#' in finally_article or "-" in finally_article: + # 使用正则表达式一次性替换多个字符 + old_content = re.sub(r'[*#-]', '', message_content) + else: + # 如果不需要替换,直接使用原内容 + old_content = finally_article + + print("改写完成的文章:" + old_content) + + # 删除AI词汇 + content = old_content + + check_link_insert(host, user, password, database, link) + + # 判断文章合规度 + if text_detection(content) == "合规": + print("文章合规") + pass + else: + print("文章不合规") + return + + with open(article_save_path, 'w', encoding='utf-8') as f: + f.write(content) + logging.info('文本已经保存') + + if img_urls: + download_and_process_images(img_urls, title) + + except Exception as e: + logging.error(f"处理链接 {link} 时出错: {e}") + raise + + +def link_to_text(prompt1=None, prompt2=None, num_threads=None, ai_service="dify"): + use_link_path = 'use_link_path.txt' + + # 读取链接 + links = read_excel(TITLE_BASE_PATH) + + # 过滤已处理的链接 + filtered_links = [] + host = CONFIG['Database']['host'] + user = CONFIG['Database']['user'] + password = CONFIG['Database']['password'] + database = CONFIG['Database']['database'] + + for link in links: + logging.info(f"总共{len(links)}个链接") + if check_link_exists(host, user, password, database, link): + logger.info(f"链接已存在: {link}") + continue + else: + filtered_links.append(link) + logger.info(f"链接不存在: {link}") + print("链接不存在,存储到过滤器中:", link) + + if not filtered_links: + logger.info("没有新链接需要处理") + return [] + + # 使用多线程处理链接 + results = process_links_with_threads(filtered_links, num_threads, ai_service) + + # 记录已处理的链接 + with open(use_link_path, 'a+', encoding='utf-8') as f: + for link, success, _ in results: + if success: + f.write(link + "\n") + + return results + + +# 创建一个任务队列和结果队列 +task_queue = queue.Queue() +result_queue = queue.Queue() + + +# 工作线程函数 +def worker(ai_service): + while True: + try: + # 从队列中获取任务 + link = task_queue.get() + if link is None: # 结束信号 + break + + # 处理链接 + try: + logger.info(f"开始处理链接:{link}") + process_link(link, ai_service) + result_queue.put((link, True, None)) # 成功 + except Exception as e: + result_queue.put((link, False, str(e))) # 失败 + logger.error(f"处理链接 {link} 时出错: {e}") + + # 标记任务完成 + task_queue.task_done() + except Exception as e: + logger.error(f"工作线程出错: {e}") + + +# 多线程处理链接 +def process_links_with_threads(links, num_threads=None, ai_service="dify"): + if num_threads is None: + num_threads = min(MAX_THREADS, len(links)) + else: + num_threads = min(num_threads, MAX_THREADS, len(links)) + + # 清空任务队列和结果队列 + while not task_queue.empty(): + task_queue.get() + while not result_queue.empty(): + result_queue.get() + + # 创建工作线程 + threads = [] + + # 将AI服务选择传递给worker函数 + for _ in range(num_threads): + t = threading.Thread(target=worker, args=(ai_service,)) + t.daemon = True + t.start() + threads.append(t) + + # 添加任务到队列 + for link in links: + task_queue.put(link) + + # 添加结束信号 + for _ in range(num_threads): + task_queue.put(None) + + # 等待所有线程完成 + for t in threads: + t.join() + + # 处理结果 + results = [] + while not result_queue.empty(): + results.append(result_queue.get()) + + return results diff --git a/ArticleReplaceBatch/test.py b/ArticleReplaceBatch/test.py index 9ab9351..ccd652a 100644 --- a/ArticleReplaceBatch/test.py +++ b/ArticleReplaceBatch/test.py @@ -1,11 +1,20 @@ -from get_web_content import wechat_extract_content,toutiao_w_extract_content,toutiao_extract_content +import json + +import requests + +from bs4 import BeautifulSoup + +from get_web_content import wechat_extract_content, toutiao_w_extract_content, toutiao_extract_content + +from utils import handle_duplicate_files_advanced -title,article,imgs = wechat_extract_content("https://mp.weixin.qq.com/s/3KejJOMuY2y6LA5k1tNwcg") -# title,article,imgs = toutiao_extract_content("https://www.toutiao.com/article/7491890368917602825/?log_from=ab01481cf63ba_1744526333347") +# title,article,imgs = wechat_extract_content("https://mp.weixin.qq.com/s/3KejJOMuY2y6LA5k1tNwcg") +# title,article,imgs = toutiao_w_extract_content("https://www.toutiao.com/w/1830082267985932/") +# title,article,imgs = toutiao_extract_content("https://www.toutiao.com/article/7496132108239356479/") +# print(imgs) +# print(type(imgs)) -print("title:",title) -print("article",article) - -print("imgs",imgs) \ No newline at end of file +name = handle_duplicate_files_advanced(r"F:\work\code\python\ArticleReplaceBatch\articles","exeample.txt") +print(name[0]) \ No newline at end of file diff --git a/ArticleReplaceBatch/txt2docx.py b/ArticleReplaceBatch/txt2docx.py new file mode 100644 index 0000000..59b3914 --- /dev/null +++ b/ArticleReplaceBatch/txt2docx.py @@ -0,0 +1,340 @@ +import PySimpleGUI as sg +import json + +import os +import random +import re +from docx.shared import Pt, RGBColor +from docx.enum.text import WD_PARAGRAPH_ALIGNMENT, WD_UNDERLINE +from docx.enum.text import WD_ALIGN_PARAGRAPH +from docx.oxml import OxmlElement +from docx.oxml.ns import qn +from docx.enum.style import WD_STYLE_TYPE +from docx import Document +from docx.shared import Inches +from PIL import Image + +# 保存文件路径的 JSON 文件 +SETTINGS_FILE = 'settings.json' + + +def set_picture_wrapping(paragraph): + """ + 设置图片环绕方式 + :param paragraph: + :return: + """ + # 设置图片环绕方式为上下环绕 + pPr = paragraph._element.get_or_add_pPr() + framePr = OxmlElement('w:framePr') + framePr.set(qn('w:wrap'), 'around') + framePr.set(qn('w:vAnchor'), 'text') + framePr.set(qn('w:hAnchor'), 'text') + pPr.append(framePr) + + +def format_word_document(input_filename, output_filename): + # 打开文档 + doc = Document(input_filename) + + # 创建或更新标题样式 + style = doc.styles.add_style('CustomHeading', WD_STYLE_TYPE.PARAGRAPH) + style.font.name = '黑体' + style.font.size = Pt(22) # 二号字 + style.font.color.rgb = RGBColor(0, 0, 255) # 蓝色 + style.paragraph_format.space_after = Pt(12) # 标题后间距 + # 创建或更新正文样式 + style = doc.styles.add_style('CustomBody', WD_STYLE_TYPE.PARAGRAPH) + style.font.name = '仿宋' + style.font.size = Pt(14) # 四号字 + style.paragraph_format.first_line_indent = Pt(20) # 首行缩进两字符 + style.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.LEFT + style.paragraph_format.line_spacing = 1.5 # 行间距 + style.paragraph_format.space_before = Pt(6) # 段前间距 + style.paragraph_format.space_after = Pt(6) # 段后间距 + + # 遍历所有段落 + for paragraph in doc.paragraphs: + # 设置标题格式 + if paragraph.style.name.startswith('Heading'): + paragraph.style = doc.styles['CustomHeading'] + + # 设置段落格式 + else: + paragraph.style = doc.styles['CustomBody'] + + # 遍历所有图片 + for rel in doc.part.rels.values(): + if "image" in rel.target_ref: + # 获取图片所在的段落 + for paragraph in doc.paragraphs: + for run in paragraph.runs: + if run._element.tag.endswith('}pict'): + # 设置图片居中 + paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER + # 设置图片环绕方式为上下环绕 + set_picture_wrapping(paragraph) + paragraph.paragraph_format.space_before = Pt(12) + paragraph.paragraph_format.space_after = Pt(12) + + # output_filename = remove_book_titles(output_filename) + + # 保存文档 + doc.save(output_filename) + + +def crop_and_replace_images(folder_path): + """ + 修改图片尺寸 + :param folder_path: + :return: + """ + folder_path = folder_path.strip() + # 遍历文件夹中的所有文件 + if not os.path.exists(folder_path): + os.mkdir(folder_path) + else: + for filename in os.listdir(folder_path): + if os.path.exists(filename): + # 检查文件扩展名是否为图片格式 + if filename.lower().endswith(('.jpg','.png')): + # 拼接完整的文件路径 + file_path = os.path.join(folder_path, filename) + print("文件夹路径:" + folder_path) + print("文件路径:" + file_path) + # 打开图片 + with Image.open(file_path) as img: + # 获取图片的尺寸 + width, height = img.size + # 裁剪图片,裁剪下方10px + cropped_img = img.crop((0, 0, width, height - (height * 0.2))) + # 保存裁剪后的图片,覆盖原文件 + output_path = file_path[0:file_path.find('.')] + '.png' + cropped_img.save(output_path, 'PNG') + + +def split_text_into_paragraphs(text): + """ + 将文本分割成段落,并在每个段落之间加一个空行 + :param text: 输入的文本 + :return: 段落列表 + """ + paragraphs = text.split('\n\n') + # 过滤掉空行和只包含空白字符的段落 + paragraphs = list(filter(lambda p: p.strip(), paragraphs)) + + # 在每个段落之间加一个空行 + paragraphs_with_blank_lines = [] + for paragraph in paragraphs: + paragraphs_with_blank_lines.append(paragraph) + paragraphs_with_blank_lines.append('') + + # 移除最后一个多余的空行 + if paragraphs_with_blank_lines: + paragraphs_with_blank_lines.pop() + + return paragraphs_with_blank_lines + + +def insert_images_into_paragraphs(paragraphs, image_folder, doc, title): + """ + 将图片插入到段落中 + :param paragraphs: + :param image_folder: + :param doc: + :return: + """ + + # 获取图片列表并排序 + images = sorted([os.path.join(image_folder, img) for img in os.listdir(image_folder) if + img.lower().endswith(('jpg'))]) + # images = sorted([os.path.join(image_folder, img) for img in os.listdir(image_folder) if + # # img.lower().endswith(('png', 'jpg', 'jpeg'))]) + + total_images = len(images) + + image_index = 0 + for i, paragraph in enumerate(paragraphs): + + if "正文:" in paragraph: + paragraph = paragraph.replace("正文:", '') + p = doc.add_paragraph(paragraph) + if os.path.exists(image_folder): + # 插入图片 + if image_index < total_images: + img_path = images[image_index] + + # 确保图片路径正确且图片文件存在 + if os.path.exists(img_path): + try: + with Image.open(img_path) as img: + width, height = img.size + doc.add_picture(img_path, width=Inches(width / height * 1.5)) + image_index += 1 + except Exception as e: + print(f"无法识别图像: {img_path}, 错误: {e}") + continue + else: + print(f"图片路径无效: {img_path}") + + +def create_word_document(text, image_folder, output_path, title): + """ + 创建Word文档 + :param text: + :param image_folder: + :param output_path: + :return: + """ + doc = Document() + paragraphs = split_text_into_paragraphs(text) + insert_images_into_paragraphs(paragraphs, image_folder, doc, title) + # modify_document(doc) + doc.save(output_path) + format_word_document(output_path, output_path) + print(f'文档已保存到: {output_path}') + + +# 读取指定路径下txt文本的内容 +def read_text_file(file_path): + """ + 读取指定路径下txt文本的内容 + :param file_path: + :return: + """ + with open(file_path, 'r', encoding='utf-8') as file: + return file.read() + + +def get_file_name(file_path): + """ + 获取文件名 + :param file_path: + :return: + """ + return os.path.basename(file_path) + + +def apply_random_style(paragraph): + # 预定义字体颜色列表 + predefined_font_colors = [ + RGBColor(255, 0, 0), # 红色 + RGBColor(255, 165, 0), # 橙色 + RGBColor(128, 0, 128), # 紫色 + ] + + # 预定义背景颜色列表(手动定义RGB颜色,避免太亮或太深) + predefined_bg_colors = [ + RGBColor(240, 240, 240), # 浅灰色 + RGBColor(255, 255, 224), # 浅黄色 + RGBColor(224, 255, 224), # 浅绿色 + RGBColor(224, 255, 255), # 浅青色 + RGBColor(255, 228, 225), # 浅粉色 + RGBColor(240, 248, 255), # 浅蓝色 + ] + + # 获取段落中的每一个run对象(代表一段连续的文字) + for run in paragraph.runs: + # 随机选择样式 + style_choice = random.choice(['bold', 'italic', 'underline', 'color', 'background']) + + if style_choice == 'bold': + run.bold = True + elif style_choice == 'italic': + run.italic = True + elif style_choice == 'underline': + run.underline = WD_UNDERLINE.SINGLE + elif style_choice == 'color': + # 从预定义颜色中随机选择一个颜色 + run.font.color.rgb = random.choice(predefined_font_colors) + elif style_choice == 'background': + # 从预定义背景颜色中随机选择一个颜色 + run.font.color.highlight_color = random.choice(predefined_bg_colors) + + +def txt2docx(txt_path, image_path, keep_txt=True): + file_path = txt_path + txts = sorted([os.path.join(file_path, txt) for txt in os.listdir(file_path) if + txt.lower().endswith(('txt'))]) + img_path = image_path + + for txt in txts: + print("正在修改:" + txt) + text = read_text_file(txt) + # print(text) + txt_name = get_file_name(txt) + title_name = txt_name.replace(".txt", "") + title = title_name + print(title) + if "正文:" in text: + new_text = text.split('正文:')[1].replace("```markdown", "").replace("```", "") + else: + new_text = text.replace("```markdown", "").replace("```", "") + content = new_text + image_folder = img_path + '\\' + txt_name.replace(".txt", "").rstrip(".") + # crop_and_replace_images(image_folder) + + create_word_document(content, image_folder, txt.replace(".txt", ".docx"), title_name) + + # 根据用户选择决定是否删除原始txt文件 + if not keep_txt: + os.remove(txt) + print(f"已删除原始文件: {txt}") + else: + print(f"保留原始文件: {txt}") + + +# 加载设置 +def load_settings(): + if os.path.exists(SETTINGS_FILE): + with open(SETTINGS_FILE, 'r') as f: + return json.load(f) + return {'folder1': '', 'folder2': ''} + + +# 保存设置 +def save_settings(settings): + with open(SETTINGS_FILE, 'w') as f: + json.dump(settings, f) + + +# 自定义函数,用于处理用户选择的文件夹 +def process_folders(folder1, folder2, keep_txt=True): + # 在这里添加处理文件夹的代码 + txt2docx(folder1, folder2, keep_txt) + + +# 加载之前的设置 +settings = load_settings() +if 'keep_txt' not in settings: + settings['keep_txt'] = True + +# 定义窗口的布局 +layout = [ + [sg.Text('文章文件夹:'), sg.Input(default_text=settings['folder1']), sg.FolderBrowse()], + [sg.Text('图片文件夹:'), sg.Input(default_text=settings['folder2']), sg.FolderBrowse()], + [sg.Checkbox('保留原始txt文件', default=settings['keep_txt'], key='keep_txt')], + [sg.Button('确认'), sg.Button('取消')] +] + +# 创建窗口 +window = sg.Window('文件夹选择窗口', layout) + +# 事件循环 +while True: + event, values = window.read() + if event == sg.WIN_CLOSED or event == '取消': # 如果用户关闭窗口或点击取消按钮 + break + elif event == '确认': # 如果用户点击确认按钮 + folder1 = values[0] + folder2 = values[1] + keep_txt = values['keep_txt'] + process_folders(folder1, folder2, keep_txt) + # 保存用户选择的文件夹路径和保留txt文件的选项 + settings['folder1'] = folder1 + settings['folder2'] = folder2 + settings['keep_txt'] = keep_txt + save_settings(settings) + +# 关闭窗口 +window.close() diff --git a/ArticleReplaceBatch/utils.py b/ArticleReplaceBatch/utils.py index aab4860..8ab3b47 100644 --- a/ArticleReplaceBatch/utils.py +++ b/ArticleReplaceBatch/utils.py @@ -99,3 +99,45 @@ def read_excel(file_name): return first_colunm_data + + +from typing import Tuple + + +def handle_duplicate_files_advanced(folder_path: str, filename: str) -> Tuple[str, bool]: + """ + 增强版:处理文件夹中的同名文件,支持更复杂的场景 + + 参数: + folder_path: 文件夹路径 + filename: 原始文件名 + + 返回: + Tuple[str, bool]: (处理后的文件名, 是否是重命名的) + """ + base, ext = os.path.splitext(filename) + target_path = os.path.join(folder_path, filename) + + if not os.path.exists(target_path): + return filename, False + + existing_files = set(os.listdir(folder_path)) + pattern = re.compile(r'^{}(_(\d+))?{}$'.format(re.escape(base), re.escape(ext))) + + # 找出所有匹配的文件并提取数字 + numbers = [] + for f in existing_files: + match = pattern.match(f) + if match: + num = int(match.group(2)) if match.group(2) else 0 + numbers.append(num) + + next_num = max(numbers) + 1 if numbers else 1 + new_filename = f"{base}_{next_num}{ext}" + + # 确保新文件名也不存在(处理并发情况) + while new_filename in existing_files: + next_num += 1 + new_filename = f"{base}_{next_num}{ext}" + + return new_filename, True \ No newline at end of file