修改获取网页内容代码

This commit is contained in:
太一 2025-05-06 17:04:38 +08:00
parent 2d377de6fd
commit 113c97c887
8 changed files with 1489 additions and 45 deletions

View File

@ -0,0 +1,691 @@
import sys # 导入sys模块
from PIL import Image, ImageDraw, ImageFont, ImageEnhance
import time
import random
import threading
import tkinter as tk
from config import *
from tkinter import ttk, messagebox, filedialog
from tkinter.scrolledtext import ScrolledText
import pymysql
from main_process_wtt import link_to_text, task_queue, result_queue
sys.setrecursionlimit(5000)
class ArticleReplaceApp(tk.Tk):
def __init__(self):
super().__init__()
self.title("文章采集与处理工具")
self.geometry("900x600")
# 创建标签页控件
self.notebook = ttk.Notebook(self)
self.notebook.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# 创建主页面
self.main_frame = ttk.Frame(self.notebook)
self.notebook.add(self.main_frame, text="主页面")
# 创建配置页面
self.config_frame = ttk.Frame(self.notebook)
self.notebook.add(self.config_frame, text="配置")
# 初始化主页面
self.init_main_frame()
# 初始化配置页面
self.init_config_frame()
# 初始化变量
self.running = False
self.thread = None
self.total_links = 0
self.processed_links = 0
# 设置关闭窗口事件
self.protocol("WM_DELETE_WINDOW", self.on_close)
def init_main_frame(self):
# 创建左侧控制面板
control_frame = ttk.LabelFrame(self.main_frame, text="控制面板")
control_frame.pack(side=tk.LEFT, fill=tk.Y, padx=10, pady=10)
# Excel文件选择
ttk.Label(control_frame, text="Excel文件:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W)
self.excel_path_var = tk.StringVar(value=TITLE_BASE_PATH)
ttk.Entry(control_frame, textvariable=self.excel_path_var, width=30).grid(row=0, column=1, padx=5, pady=5)
ttk.Button(control_frame, text="浏览", command=self.browse_excel).grid(row=0, column=2, padx=5, pady=5)
# 线程数设置
ttk.Label(control_frame, text="线程数:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W)
self.thread_count_var = tk.StringVar(value="1")
ttk.Spinbox(control_frame, from_=1, to=MAX_THREADS, textvariable=self.thread_count_var, width=5).grid(row=1,
column=1,
padx=5,
pady=5,
sticky=tk.W)
# AI服务提供商选择
ttk.Label(control_frame, text="工作流选择:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W)
self.ai_service_var = tk.StringVar(value="dify")
ai_service_combo = ttk.Combobox(control_frame, textvariable=self.ai_service_var, values=["dify", "coze"], width=10, state="readonly")
ai_service_combo.grid(row=2, column=1, padx=5, pady=5, sticky=tk.W)
# 开始按钮
self.start_button = ttk.Button(control_frame, text="开始处理", command=self.start_processing)
self.start_button.grid(row=3, column=0, columnspan=3, padx=5, pady=20)
# 进度条
ttk.Label(control_frame, text="处理进度:").grid(row=4, column=0, padx=5, pady=5, sticky=tk.W)
self.progress_var = tk.DoubleVar()
ttk.Progressbar(control_frame, variable=self.progress_var, maximum=100).grid(row=4, column=1, columnspan=2,
padx=5, pady=5, sticky=tk.EW)
# 创建右侧日志面板
log_frame = ttk.LabelFrame(self.main_frame, text="日志")
log_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=10, pady=10)
# 日志文本框
self.log_text = ScrolledText(log_frame, width=70, height=30)
self.log_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
self.log_text.config(state=tk.DISABLED)
# 添加日志处理器
self.log_handler = LogTextHandler(self.log_text)
self.log_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
self.log_handler.setFormatter(formatter)
logger.addHandler(self.log_handler)
def init_config_frame(self):
# 创建配置标签页
config_notebook = ttk.Notebook(self.config_frame)
config_notebook.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# 创建各个配置页面
general_frame = ttk.Frame(config_notebook)
database_frame = ttk.Frame(config_notebook)
dify_frame = ttk.Frame(config_notebook)
coze_frame = ttk.Frame(config_notebook)
baidu_frame = ttk.Frame(config_notebook)
image_frame = ttk.Frame(config_notebook)
keywords_frame = ttk.Frame(config_notebook)
# 添加到标签页
config_notebook.add(general_frame, text="常规设置")
config_notebook.add(database_frame, text="数据库设置")
config_notebook.add(dify_frame, text="Dify设置")
config_notebook.add(coze_frame, text="Coze设置")
config_notebook.add(baidu_frame, text="百度API设置")
config_notebook.add(image_frame, text="图片处理设置")
config_notebook.add(keywords_frame, text="违禁词设置")
# 初始化各个配置页面
self.init_general_config(general_frame)
self.init_database_config(database_frame)
self.init_dify_config(dify_frame)
self.init_coze_config(coze_frame)
self.init_baidu_config(baidu_frame)
self.init_image_config(image_frame)
self.init_keywords_config(keywords_frame)
# 保存按钮
save_button = ttk.Button(self.config_frame, text="保存所有配置", command=self.save_all_configs)
save_button.pack(side=tk.RIGHT, padx=10, pady=10)
def init_general_config(self, parent):
# Chrome用户目录
ttk.Label(parent, text="Chrome用户目录:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W)
self.chrome_dir_var = tk.StringVar(value=CONFIG['General']['chrome_user_dir'])
ttk.Entry(parent, textvariable=self.chrome_dir_var, width=50).grid(row=0, column=1, padx=5, pady=5)
ttk.Button(parent, text="浏览", command=lambda: self.browse_directory(self.chrome_dir_var)).grid(row=0,
column=2,
padx=5, pady=5)
# 文章保存路径
ttk.Label(parent, text="文章保存路径:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W)
self.articles_path_var = tk.StringVar(value=CONFIG['General']['articles_path'])
ttk.Entry(parent, textvariable=self.articles_path_var, width=50).grid(row=1, column=1, padx=5, pady=5)
ttk.Button(parent, text="浏览", command=lambda: self.browse_directory(self.articles_path_var)).grid(row=1,
column=2,
padx=5,
pady=5)
# 图片保存路径
ttk.Label(parent, text="图片保存路径:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W)
self.images_path_var = tk.StringVar(value=CONFIG['General']['images_path'])
ttk.Entry(parent, textvariable=self.images_path_var, width=50).grid(row=2, column=1, padx=5, pady=5)
ttk.Button(parent, text="浏览", command=lambda: self.browse_directory(self.images_path_var)).grid(row=2,
column=2,
padx=5,
pady=5)
# Excel文件路径
ttk.Label(parent, text="默认Excel文件:").grid(row=3, column=0, padx=5, pady=5, sticky=tk.W)
self.excel_file_var = tk.StringVar(value=CONFIG['General']['title_file'])
ttk.Entry(parent, textvariable=self.excel_file_var, width=50).grid(row=3, column=1, padx=5, pady=5)
ttk.Button(parent, text="浏览", command=lambda: self.browse_file(self.excel_file_var, [("Excel文件", "*.xlsx"),
("所有文件",
"*.*")])).grid(row=3,
column=2,
padx=5,
pady=5)
# 最大线程数
ttk.Label(parent, text="最大线程数:").grid(row=4, column=0, padx=5, pady=5, sticky=tk.W)
self.max_threads_var = tk.StringVar(value=CONFIG['General']['max_threads'])
ttk.Spinbox(parent, from_=1, to=10, textvariable=self.max_threads_var, width=5).grid(row=4, column=1, padx=5,
pady=5, sticky=tk.W)
def init_database_config(self, parent):
# 数据库主机
ttk.Label(parent, text="数据库主机:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W)
self.db_host_var = tk.StringVar(value=CONFIG['Database']['host'])
ttk.Entry(parent, textvariable=self.db_host_var, width=30).grid(row=0, column=1, padx=5, pady=5)
# 数据库用户名
ttk.Label(parent, text="数据库用户名:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W)
self.db_user_var = tk.StringVar(value=CONFIG['Database']['user'])
ttk.Entry(parent, textvariable=self.db_user_var, width=30).grid(row=1, column=1, padx=5, pady=5)
# 数据库密码
ttk.Label(parent, text="数据库密码:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W)
self.db_password_var = tk.StringVar(value=CONFIG['Database']['password'])
ttk.Entry(parent, textvariable=self.db_password_var, width=30, show="*").grid(row=2, column=1, padx=5, pady=5)
# 数据库名称
ttk.Label(parent, text="数据库名称:").grid(row=3, column=0, padx=5, pady=5, sticky=tk.W)
self.db_name_var = tk.StringVar(value=CONFIG['Database']['database'])
ttk.Entry(parent, textvariable=self.db_name_var, width=30).grid(row=3, column=1, padx=5, pady=5)
# 测试连接按钮
ttk.Button(parent, text="测试连接", command=self.test_db_connection).grid(row=4, column=1, padx=5, pady=10,
sticky=tk.E)
def init_dify_config(self, parent):
# Dify API Key
ttk.Label(parent, text="API Key:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W)
self.dify_api_key_var = tk.StringVar(value=CONFIG['Dify']['api_key'])
ttk.Entry(parent, textvariable=self.dify_api_key_var, width=50).grid(row=0, column=1, padx=5, pady=5)
# Dify User ID
ttk.Label(parent, text="User ID:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W)
self.dify_user_id_var = tk.StringVar(value=CONFIG['Dify']['user_id'])
ttk.Entry(parent, textvariable=self.dify_user_id_var, width=30).grid(row=1, column=1, padx=5, pady=5)
# Dify URL
ttk.Label(parent, text="URL:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W)
self.dify_url_var = tk.StringVar(value=CONFIG['Dify']['url'])
ttk.Entry(parent, textvariable=self.dify_url_var, width=50).grid(row=2, column=1, padx=5, pady=5)
# Dify Input Data Template
ttk.Label(parent, text="Input Data模板:").grid(row=3, column=0, padx=5, pady=5, sticky=tk.W)
self.dify_input_data_template_var = tk.StringVar(value=CONFIG['Dify'].get('input_data_template', '{"old_article": "{article_text}"}')) # 添加默认值
ttk.Entry(parent, textvariable=self.dify_input_data_template_var, width=50).grid(row=3, column=1, padx=5, pady=5)
def init_coze_config(self, parent):
# Coze Workflow ID
ttk.Label(parent, text="Workflow ID:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W)
self.coze_workflow_id_var = tk.StringVar(value=CONFIG['Coze']['workflow_id'])
ttk.Entry(parent, textvariable=self.coze_workflow_id_var, width=50).grid(row=0, column=1, padx=5, pady=5)
# Coze Access Token
ttk.Label(parent, text="Access Token:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W)
self.coze_access_token_var = tk.StringVar(value=CONFIG['Coze']['access_token'])
ttk.Entry(parent, textvariable=self.coze_access_token_var, width=50).grid(row=1, column=1, padx=5, pady=5)
# Coze Is Async
ttk.Label(parent, text="Is Async:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W)
self.coze_is_async_var = tk.StringVar(value=CONFIG['Coze']['is_async'])
ttk.Combobox(parent, textvariable=self.coze_is_async_var, values=["true", "false"], width=10, state="readonly").grid(row=2, column=1, padx=5, pady=5, sticky=tk.W)
# Coze Input Data Template
ttk.Label(parent, text="Input Data模板:").grid(row=3, column=0, padx=5, pady=5, sticky=tk.W)
self.coze_input_data_template_var = tk.StringVar(value=CONFIG['Coze'].get('input_data_template', '{{"article": "{article_text}", "link":"{link}", "weijin":"{weijin}"}}')) # 添加默认值
ttk.Entry(parent, textvariable=self.coze_input_data_template_var, width=50).grid(row=3, column=1, padx=5, pady=5)
def init_baidu_config(self, parent):
# 百度 API Key
ttk.Label(parent, text="API Key:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W)
self.baidu_api_key_var = tk.StringVar(value=CONFIG['Baidu']['api_key'])
ttk.Entry(parent, textvariable=self.baidu_api_key_var, width=50).grid(row=0, column=1, padx=5, pady=5)
# 百度 Secret Key
ttk.Label(parent, text="Secret Key:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W)
self.baidu_secret_key_var = tk.StringVar(value=CONFIG['Baidu']['secret_key'])
ttk.Entry(parent, textvariable=self.baidu_secret_key_var, width=50).grid(row=1, column=1, padx=5, pady=5)
def init_image_config(self, parent):
# 裁剪百分比
ttk.Label(parent, text="裁剪百分比:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W)
self.crop_percent_var = tk.StringVar(value=CONFIG['ImageModify']['crop_percent'])
ttk.Entry(parent, textvariable=self.crop_percent_var, width=10).grid(row=0, column=1, padx=5, pady=5,
sticky=tk.W)
# 最小旋转角度
ttk.Label(parent, text="最小旋转角度:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W)
self.min_rotation_var = tk.StringVar(value=CONFIG['ImageModify']['min_rotation'])
ttk.Entry(parent, textvariable=self.min_rotation_var, width=10).grid(row=1, column=1, padx=5, pady=5,
sticky=tk.W)
# 最大旋转角度
ttk.Label(parent, text="最大旋转角度:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W)
self.max_rotation_var = tk.StringVar(value=CONFIG['ImageModify']['max_rotation'])
ttk.Entry(parent, textvariable=self.max_rotation_var, width=10).grid(row=2, column=1, padx=5, pady=5,
sticky=tk.W)
# 最小亮度
ttk.Label(parent, text="最小亮度:").grid(row=3, column=0, padx=5, pady=5, sticky=tk.W)
self.min_brightness_var = tk.StringVar(value=CONFIG['ImageModify']['min_brightness'])
ttk.Entry(parent, textvariable=self.min_brightness_var, width=10).grid(row=3, column=1, padx=5, pady=5,
sticky=tk.W)
# 最大亮度
ttk.Label(parent, text="最大亮度:").grid(row=4, column=0, padx=5, pady=5, sticky=tk.W)
self.max_brightness_var = tk.StringVar(value=CONFIG['ImageModify']['max_brightness'])
ttk.Entry(parent, textvariable=self.max_brightness_var, width=10).grid(row=4, column=1, padx=5, pady=5,
sticky=tk.W)
# 水印文字
ttk.Label(parent, text="水印文字:").grid(row=0, column=2, padx=5, pady=5, sticky=tk.W)
self.watermark_text_var = tk.StringVar(value=CONFIG['ImageModify']['watermark_text'])
ttk.Entry(parent, textvariable=self.watermark_text_var, width=30).grid(row=0, column=3, padx=5, pady=5)
# 水印透明度
ttk.Label(parent, text="水印透明度:").grid(row=1, column=2, padx=5, pady=5, sticky=tk.W)
self.watermark_opacity_var = tk.StringVar(value=CONFIG['ImageModify']['watermark_opacity'])
ttk.Entry(parent, textvariable=self.watermark_opacity_var, width=10).grid(row=1, column=3, padx=5, pady=5,
sticky=tk.W)
# 蒙版透明度
ttk.Label(parent, text="蒙版透明度:").grid(row=2, column=2, padx=5, pady=5, sticky=tk.W)
self.overlay_opacity_var = tk.StringVar(value=CONFIG['ImageModify']['overlay_opacity'])
ttk.Entry(parent, textvariable=self.overlay_opacity_var, width=10).grid(row=2, column=3, padx=5, pady=5,
sticky=tk.W)
# 预览按钮
ttk.Button(parent, text="预览效果", command=self.preview_image_effect).grid(row=4, column=3, padx=5, pady=5,
sticky=tk.E)
def init_keywords_config(self, parent):
# 违禁词列表
ttk.Label(parent, text="违禁词列表:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W)
self.banned_words_text = ScrolledText(parent, width=60, height=15)
self.banned_words_text.grid(row=1, column=0, columnspan=2, padx=5, pady=5, sticky=tk.NSEW)
self.banned_words_text.insert(tk.END, CONFIG['Keywords']['banned_words'].replace(',', '\n'))
# 保存按钮
ttk.Button(parent, text="保存违禁词", command=self.save_banned_words).grid(row=2, column=1, padx=5, pady=5,
sticky=tk.E)
# 配置行列权重
parent.columnconfigure(0, weight=1)
parent.rowconfigure(1, weight=1)
def save_banned_words(self):
# 处理文本,将换行符替换为逗号
words = self.banned_words_text.get(1.0, tk.END).strip().replace('\n', ',')
CONFIG['Keywords']['banned_words'] = words
messagebox.showinfo("保存成功", "违禁词列表已更新")
def browse_directory(self, var):
directory = filedialog.askdirectory()
if directory:
var.set(directory)
def browse_file(self, var, filetypes):
file_path = filedialog.askopenfilename(filetypes=filetypes)
if file_path:
var.set(file_path)
def browse_excel(self):
file_path = filedialog.askopenfilename(filetypes=[("Excel文件", "*.xlsx"), ("所有文件", "*.*")])
if file_path:
self.excel_path_var.set(file_path)
def test_db_connection(self):
try:
host = self.db_host_var.get()
user = self.db_user_var.get()
password = self.db_password_var.get()
database = self.db_name_var.get()
connection = pymysql.connect(
host=host,
user=user,
password=password,
database=database
)
connection.close()
messagebox.showinfo("连接成功", "数据库连接测试成功!")
except Exception as e:
messagebox.showerror("连接失败", f"数据库连接测试失败:{e}")
def preview_image_effect(self):
try:
# 创建一个示例图片
img = Image.new('RGB', (400, 300), color=(240, 240, 240))
draw = ImageDraw.Draw(img)
draw.rectangle([50, 50, 350, 250], fill=(200, 200, 200))
draw.text((150, 140), "示例图片", fill=(0, 0, 0))
# 应用图片修改效果
modified_img = self.apply_image_modifications(img)
# 显示修改后的图片
self.show_preview_image(modified_img)
except Exception as e:
messagebox.showerror("预览失败", f"生成预览图片时出错:{e}")
def apply_image_modifications(self, img):
"""应用当前配置的图片修改效果"""
width, height = img.size
try:
# 从界面获取参数
crop_percent = float(self.crop_percent_var.get())
min_rotation = float(self.min_rotation_var.get())
max_rotation = float(self.max_rotation_var.get())
min_brightness = float(self.min_brightness_var.get())
max_brightness = float(self.max_brightness_var.get())
watermark_text = self.watermark_text_var.get()
watermark_opacity = int(self.watermark_opacity_var.get())
overlay_opacity = int(self.overlay_opacity_var.get())
# 1. 裁剪边缘
crop_px_w = int(width * crop_percent)
crop_px_h = int(height * crop_percent)
img = img.crop((crop_px_w, crop_px_h, width - crop_px_w, height - crop_px_h))
# 2. 随机旋转
angle = random.uniform(min_rotation, max_rotation) * random.choice([-1, 1])
img = img.rotate(angle, expand=True)
# 3. 调整亮度
enhancer = ImageEnhance.Brightness(img)
factor = random.uniform(min_brightness, max_brightness)
img = enhancer.enhance(factor)
# 4. 添加文字水印
draw = ImageDraw.Draw(img)
font_size = max(20, int(min(img.size) * 0.05))
try:
font = ImageFont.truetype("arial.ttf", font_size)
except:
font = ImageFont.load_default()
# 获取文本尺寸
text_width, text_height = draw.textbbox((0, 0), watermark_text, font=font)[2:]
# 水印放在图片右下角
x = img.size[0] - text_width - 5
y = img.size[1] - text_height - 5
draw.text((x, y), watermark_text, font=font, fill=(255, 255, 255, watermark_opacity))
# 5. 添加半透明蒙版
overlay = Image.new('RGBA', img.size, (255, 255, 255, overlay_opacity))
if img.mode != 'RGBA':
img = img.convert('RGBA')
img = Image.alpha_composite(img, overlay)
return img.convert('RGB')
except Exception as e:
messagebox.showerror("参数错误", f"应用图片修改时出错:{e}")
return img
def show_preview_image(self, img):
"""显示预览图片"""
preview_window = tk.Toplevel(self)
preview_window.title("图片效果预览")
preview_window.geometry("500x400")
# 将PIL图像转换为Tkinter可用的格式
from PIL import ImageTk
tk_img = ImageTk.PhotoImage(img)
# 显示图片
label = tk.Label(preview_window, image=tk_img)
label.image = tk_img # 保持引用
label.pack(padx=10, pady=10)
# 关闭按钮
ttk.Button(preview_window, text="关闭", command=preview_window.destroy).pack(pady=10)
def save_all_configs(self):
"""保存所有配置到配置文件"""
try:
# 更新General配置
CONFIG['General']['chrome_user_dir'] = self.chrome_dir_var.get()
CONFIG['General']['articles_path'] = self.articles_path_var.get()
CONFIG['General']['images_path'] = self.images_path_var.get()
CONFIG['General']['title_file'] = self.excel_file_var.get()
CONFIG['General']['max_threads'] = self.max_threads_var.get()
# 更新Database配置
CONFIG['Database']['host'] = self.db_host_var.get()
CONFIG['Database']['user'] = self.db_user_var.get()
CONFIG['Database']['password'] = self.db_password_var.get()
CONFIG['Database']['database'] = self.db_name_var.get()
# 更新Dify配置
CONFIG['Dify']['api_key'] = self.dify_api_key_var.get()
CONFIG['Dify']['user_id'] = self.dify_user_id_var.get()
CONFIG['Dify']['url'] = self.dify_url_var.get()
CONFIG['Dify']['input_data_template'] = self.dify_input_data_template_var.get() # 保存新字段
# 更新Coze配置
CONFIG['Coze']['workflow_id'] = self.coze_workflow_id_var.get()
CONFIG['Coze']['access_token'] = self.coze_access_token_var.get()
CONFIG['Coze']['is_async'] = self.coze_is_async_var.get()
CONFIG['Coze']['input_data_template'] = self.coze_input_data_template_var.get() # 保存 Coze input data 模板
# 更新Baidu配置
CONFIG['Baidu']['api_key'] = self.baidu_api_key_var.get()
CONFIG['Baidu']['secret_key'] = self.baidu_secret_key_var.get()
# 更新ImageModify配置
CONFIG['ImageModify']['crop_percent'] = self.crop_percent_var.get()
CONFIG['ImageModify']['min_rotation'] = self.min_rotation_var.get()
CONFIG['ImageModify']['max_rotation'] = self.max_rotation_var.get()
CONFIG['ImageModify']['min_brightness'] = self.min_brightness_var.get()
CONFIG['ImageModify']['max_brightness'] = self.max_brightness_var.get()
CONFIG['ImageModify']['watermark_text'] = self.watermark_text_var.get()
CONFIG['ImageModify']['watermark_opacity'] = self.watermark_opacity_var.get()
CONFIG['ImageModify']['overlay_opacity'] = self.overlay_opacity_var.get()
# 保存配置到文件
save_config(CONFIG)
# 更新全局变量
global USER_DIR_PATH, ARTICLES_BASE_PATH, IMGS_BASE_PATH, TITLE_BASE_PATH, MAX_THREADS
USER_DIR_PATH = CONFIG['General']['chrome_user_dir']
ARTICLES_BASE_PATH = CONFIG['General']['articles_path']
IMGS_BASE_PATH = CONFIG['General']['images_path']
TITLE_BASE_PATH = CONFIG['General']['title_file']
MAX_THREADS = int(CONFIG['General']['max_threads'])
# 创建必要的目录
if not os.path.exists(ARTICLES_BASE_PATH):
os.makedirs(ARTICLES_BASE_PATH)
if not os.path.exists(IMGS_BASE_PATH):
os.makedirs(IMGS_BASE_PATH)
messagebox.showinfo("保存成功", "所有配置已保存")
except Exception as e:
messagebox.showerror("保存失败", f"保存配置时出错:{e}")
def start_processing(self):
"""开始处理链接"""
if self.running:
messagebox.showinfo("处理中", "已有任务正在处理中,请等待完成")
return
try:
# 更新Excel文件路径
excel_path = self.excel_path_var.get()
if not os.path.exists(excel_path):
messagebox.showerror("文件错误", f"Excel文件不存在{excel_path}")
return
# 获取线程数
try:
num_threads = int(self.thread_count_var.get())
if num_threads < 1:
num_threads = 1
elif num_threads > MAX_THREADS:
num_threads = MAX_THREADS
except:
num_threads = 1
# 禁用开始按钮
self.start_button.config(state=tk.DISABLED)
self.running = True
# 清空日志
self.log_text.config(state=tk.NORMAL)
self.log_text.delete(1.0, tk.END)
self.log_text.config(state=tk.DISABLED)
# 获取AI服务提供商选择
ai_service = self.ai_service_var.get()
# 在新线程中运行处理任务
self.thread = threading.Thread(target=self.run_processing, args=(excel_path, num_threads, ai_service))
self.thread.daemon = True
self.thread.start()
# 启动进度更新
self.after(100, self.update_progress)
except Exception as e:
messagebox.showerror("启动失败", f"启动处理任务时出错:{e}")
self.start_button.config(state=tk.NORMAL)
self.running = False
def run_processing(self, excel_path, num_threads, ai_service):
"""在后台线程中运行处理任务"""
try:
# 更新全局变量
global TITLE_BASE_PATH
TITLE_BASE_PATH = excel_path
# 记录开始时间
start_time = time.time()
# 读取链接并处理
logger.info(f"开始处理链接,使用 {num_threads} 个线程")
results = link_to_text(num_threads=num_threads, ai_service=ai_service)
# 计算处理结果
total_links = len(results)
success_links = sum(1 for _, success, _ in results if success)
# 记录结束时间和总耗时
end_time = time.time()
elapsed_time = end_time - start_time
# 记录处理结果
logger.info(
f"处理完成,共处理 {total_links} 个链接,成功 {success_links} 个,失败 {total_links - success_links}")
logger.info(f"总耗时: {elapsed_time:.2f}")
# 在主线程中显示处理结果
self.after(0, lambda: messagebox.showinfo("处理完成",
f"共处理 {total_links} 个链接\n成功: {success_links}\n失败: {total_links - success_links}\n总耗时: {elapsed_time:.2f}"))
except Exception as e:
logger.error(f"处理任务出错: {e}")
self.after(0, lambda: messagebox.showerror("处理失败", f"处理任务出错:{e}"))
finally:
# 恢复开始按钮状态
self.after(0, lambda: self.start_button.config(state=tk.NORMAL))
self.running = False
def update_progress(self):
"""更新进度条和状态"""
if not self.running:
return
try:
# 获取当前进度
total = task_queue.qsize() + result_queue.qsize()
done = result_queue.qsize()
if total > 0:
# 更新进度条
progress = (done / total) * 100
self.progress_var.set(progress)
# 更新标题显示进度
self.title(f"文章采集与处理工具 - 进度: {progress:.1f}%")
# 继续更新
self.after(500, self.update_progress)
except Exception as e:
logger.error(f"更新进度出错: {e}")
def on_close(self):
"""关闭窗口时的处理"""
if self.running:
if messagebox.askyesno("确认退出", "任务正在处理中,确定要退出吗?"):
self.destroy()
else:
self.destroy()
# 日志处理器类,用于将日志输出到文本框
class LogTextHandler(logging.Handler):
def __init__(self, text_widget):
logging.Handler.__init__(self)
self.text_widget = text_widget
def emit(self, record):
msg = self.format(record)
def append():
self.text_widget.configure(state=tk.NORMAL)
self.text_widget.insert(tk.END, msg + '\n')
self.text_widget.see(tk.END) # 自动滚动到底部
self.text_widget.configure(state=tk.DISABLED)
# 在主线程中更新UI
self.text_widget.after(0, append)
# 主函数
def main():
# 初始化日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("article_replace.log", encoding='utf-8'),
logging.StreamHandler()
]
)
# 创建必要的目录
if not os.path.exists(ARTICLES_BASE_PATH):
os.makedirs(ARTICLES_BASE_PATH)
if not os.path.exists(IMGS_BASE_PATH):
os.makedirs(IMGS_BASE_PATH)
# 启动GUI应用
app = ArticleReplaceApp()
app.mainloop()
if __name__ == "__main__":
main()

View File

@ -37,7 +37,6 @@ def call_dify_workflow(input_data):
return article return article
# ==========================调用coze工作流========================== # ==========================调用coze工作流==========================
@ -54,17 +53,18 @@ def call_coze_workflow(parameters):
is_async = CONFIG['Coze']['is_async'].lower() == 'true' is_async = CONFIG['Coze']['is_async'].lower() == 'true'
url = "https://api.coze.cn/v1/workflow/run" url = "https://api.coze.cn/v1/workflow/run"
headers = { headers = {
"Authorization": f"Bearer {access_token}", "Authorization": f"Bearer {access_token}",
"Content-Type": "application/json" "Content-Type": "application/json"
} }
data = { data = {
"workflow_id": workflow_id, "workflow_id": workflow_id,
"parameters": parameters, "parameters": parameters,
"is_async": is_async "is_async": is_async
} }
response = requests.post(url, json=data, headers=headers) response = requests.post(url, json=data, headers=headers)
if response.status_code == 200: if response.status_code == 200:
@ -78,3 +78,49 @@ def call_coze_workflow(parameters):
"detail": response.text "detail": response.text
} }
def call_coze_article_workflow(parameters):
"""
调用 Coze 工作流的函数
:param parameters: 传递给工作流的输入参数字典格式
:param is_async: 是否异步执行默认 False
:return: 工作流的执行结果
"""
workflow_id = CONFIG['Coze']['workflow_id']
access_token = CONFIG['Coze']['access_token']
is_async = CONFIG['Coze']['is_async'].lower() == 'true'
url = "https://api.coze.cn/v1/workflow/run"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
data = {
"workflow_id": workflow_id,
"parameters": parameters,
"is_async": is_async
}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
# data = json.loads(response.text)['data']
# print("data",data['output'])
import ast
# 直接解析整个result字符串
result_dict = ast.literal_eval(response.text)
# 解析data字段
data_dict = ast.literal_eval(result_dict['data'])
# 获取output的值
output_value = data_dict['output']
return output_value
else:
return {
"error": f"请求失败,状态码:{response.status_code}",
"detail": response.text
}

View File

@ -10,6 +10,8 @@ from config import *
from utils import safe_open_directory from utils import safe_open_directory
IMGS_BASE_PATH = CONFIG['General']['images_path'] IMGS_BASE_PATH = CONFIG['General']['images_path']
def crop_and_replace_images(folder_path): def crop_and_replace_images(folder_path):
""" """
修改图片尺寸 修改图片尺寸
@ -89,7 +91,10 @@ def download_and_process_images(img_urls, article_title):
safe_open_directory(img_dir_path) safe_open_directory(img_dir_path)
for i, img_url in enumerate(img_urls): for i, img_url in enumerate(img_urls):
imgurl = "https:" + img_url if img_url.startswith("https"):
imgurl = img_url
else:
imgurl = "https:"+img_url
img_path = os.path.join(img_dir_path, f"图片{i}.jpg") img_path = os.path.join(img_dir_path, f"图片{i}.jpg")
try: try:
download_image(imgurl, img_path) download_image(imgurl, img_path)

View File

@ -1,8 +1,8 @@
import threading import threading
import queue import queue
import json # 导入 json 模块
from ai_studio import call_dify_workflow, call_coze_workflow
from ai_studio import call_dify_workflow, call_coze_workflow,call_coze_article_workflow
from databases import * from databases import *
from images_edit import download_and_process_images from images_edit import download_and_process_images
@ -20,9 +20,7 @@ def process_link(link, ai_service):
""" """
try: try:
if link.startswith("https://www.toutiao.com"): if link.startswith("https://www.toutiao.com"):
title_text, article_text, img_urls = toutiao_w_extract_content(link) title_text, article_text, img_urls = toutiao_extract_content(link)
if title_text == "":
title_text, article_text, img_urls = toutiao_extract_content(link)
elif link.startswith("https://mp.weixin.qq.co"): elif link.startswith("https://mp.weixin.qq.co"):
title_text, article_text, img_urls = wechat_extract_content(link) title_text, article_text, img_urls = wechat_extract_content(link)
else: else:
@ -44,8 +42,7 @@ def process_link(link, ai_service):
title = extract_content_until_punctuation(article_text).replace("正文:", "") title = extract_content_until_punctuation(article_text).replace("正文:", "")
print(title) logger.info(img_urls)
print(article_text)
from datetime import datetime from datetime import datetime
@ -54,6 +51,7 @@ def process_link(link, ai_service):
# 打印当前时间 # 打印当前时间
print("当前时间:", current_time) print("当前时间:", current_time)
logger.info(title_text)
if ai_service == "dify": if ai_service == "dify":
if check_keywords: if check_keywords:
@ -61,7 +59,7 @@ def process_link(link, ai_service):
check_link_insert(host, user, password, database, link) check_link_insert(host, user, password, database, link)
return return
# 从配置加载 input_data 模板 # 从配置加载 input_data 模板
input_data_template_str = CONFIG['Dify'].get('input_data_template', '{{"old_article": "{article_text}"}}') input_data_template_str = CONFIG['Dify'].get('input_data_template', '{{"title": "{article_text}"}}')
try: try:
# 解析模板字符串为字典 # 解析模板字符串为字典
input_data_template = json.loads(input_data_template_str) input_data_template = json.loads(input_data_template_str)
@ -83,50 +81,53 @@ def process_link(link, ai_service):
if check_keywords: if check_keywords:
weijin = "违禁" weijin = "违禁"
# 从配置加载 Coze input_data 模板 # 从配置加载 Coze input_data 模板
input_data_template_str = CONFIG['Coze'].get('input_data_template', # input_data_template_str = CONFIG['Coze'].get('input_data_template',
'{{"article": "{article_text}", "link":"{link}", "weijin":"{weijin}"}}') # f'{{"title": "{title_text}"}}')
try: # try:
# 解析模板字符串为字典 # # 解析模板字符串为字典
input_data_template = json.loads(input_data_template_str) # input_data_template = json.loads(input_data_template_str)
# 使用实际变量格式化模板 # # 使用实际变量格式化模板
input_data = {k: v.format(article_text=article_text, link=link, weijin=weijin) for k, v in # input_data = {k: v.format(article_text=article_text) for k, v in
input_data_template.items()} # input_data_template.items()}
except (json.JSONDecodeError, KeyError, AttributeError) as e: # except (json.JSONDecodeError, KeyError, AttributeError) as e:
logger.error(f"处理 Coze input_data 模板时出错: {e}. 使用默认模板.") # logger.error(f"处理 Coze input_data 模板时出错: {e}. 使用默认模板.")
input_data = { # input_data = {
"article": article_text, # "title": title_text
"link": link, #
"weijin": weijin # }
}
input_data = {
"title": title_text
}
message_content = call_coze_article_workflow(input_data)
# message_content = msg['result']
msg = call_coze_workflow(input_data)
message_content = msg['article']
result = msg['result']
if result == "已经创作过":
return
# 获取当前时间并格式化 # 获取当前时间并格式化
current_time = datetime.now().strftime("%H:%M:%S") current_time = datetime.now().strftime("%H:%M:%S")
# 打印当前时间 # 打印当前时间
print("当前时间:", current_time) print("当前时间:", current_time)
finally_article = message_content.replace("正文:", "") + "\n" # finally_article = message_content.replace("正文:", "") + "\n"
article_save_path = os.path.join(ARTICLES_BASE_PATH, f"{title}.txt") file_name = handle_duplicate_files_advanced(ARTICLES_BASE_PATH,title_text)
if '*' in finally_article or '#' in finally_article or "-" in finally_article: article_save_path = os.path.join(ARTICLES_BASE_PATH, f"{file_name}.txt")
if '*' in message_content or '#' in message_content or "-" in message_content:
# 使用正则表达式一次性替换多个字符 # 使用正则表达式一次性替换多个字符
old_content = re.sub(r'[*#-]', '', message_content) old_content = re.sub(r'[*#-]', '', message_content)
else: else:
# 如果不需要替换,直接使用原内容 # 如果不需要替换,直接使用原内容
old_content = finally_article old_content = message_content
print("改写完成的文章:" + old_content) print("改写完成的文章:" + old_content)
# 删除AI词汇 # 删除AI词汇
content = old_content content = old_content
check_link_insert(host, user, password, database, link) # check_link_insert(host, user, password, database, link)
# 判断文章合规度 # 判断文章合规度
if text_detection(content) == "合规": if text_detection(content) == "合规":
@ -141,14 +142,65 @@ def process_link(link, ai_service):
logging.info('文本已经保存') logging.info('文本已经保存')
if img_urls: if img_urls:
download_and_process_images(img_urls, title) download_and_process_images(img_urls, file_name)
except Exception as e: except Exception as e:
logging.error(f"处理链接 {link} 时出错: {e}") logging.error(f"处理链接 {link} 时出错: {e}")
raise raise
def link_to_text(prompt1=None, prompt2=None, num_threads=None, ai_service="dify"): def link_to_text(num_threads=None, ai_service="dify"):
use_link_path = 'use_link_path.txt'
# 读取链接
links = read_excel(TITLE_BASE_PATH)
# 过滤已处理的链接
filtered_links = []
host = CONFIG['Database']['host']
user = CONFIG['Database']['user']
password = CONFIG['Database']['password']
database = CONFIG['Database']['database']
# for link in links:
# logging.info(f"总共{len(links)}个链接")
# if check_link_exists(host, user, password, database, link):
# logger.info(f"链接已存在: {link}")
# continue
# else:
# filtered_links.append(link)
# logger.info(f"链接不存在: {link}")
# print("链接不存在,存储到过滤器中:", link)
for link in links:
logging.info(f"总共{len(links)}个链接")
filtered_links.append(link)
# if check_link_exists(host, user, password, database, link):
# logger.info(f"链接已存在: {link}")
# continue
# else:
# filtered_links.append(link)
# logger.info(f"链接不存在: {link}")
# print("链接不存在,存储到过滤器中:", link)
if not filtered_links:
logger.info("没有新链接需要处理")
return []
# 使用多线程处理链接
results = process_links_with_threads(filtered_links, num_threads, ai_service)
# 记录已处理的链接
with open(use_link_path, 'a+', encoding='utf-8') as f:
for link, success, _ in results:
if success:
f.write(link + "\n")
return results
def link_to_mysql_text(prompt1=None, prompt2=None, num_threads=None, ai_service="dify"):
use_link_path = 'use_link_path.txt' use_link_path = 'use_link_path.txt'
# 读取链接 # 读取链接

View File

@ -0,0 +1,259 @@
import threading
import queue
import json # 导入 json 模块
from ai_studio import call_dify_workflow, call_coze_workflow
from databases import *
from images_edit import download_and_process_images
from utils import *
from get_web_content import *
from config import *
# ==============================主程序===========================
def process_link(link, ai_service):
"""
处理单个链接
:param link: 要处理的链接
:param ai_service: AI服务提供商可选值dify, coze
"""
try:
if link.startswith("https://www.toutiao.com"):
title_text, article_text, img_urls = toutiao_w_extract_content(link)
if title_text == "":
title_text, article_text, img_urls = toutiao_extract_content(link)
elif link.startswith("https://mp.weixin.qq.co"):
title_text, article_text, img_urls = wechat_extract_content(link)
else:
title_text, article_text, img_urls = "", "", []
if title_text == "":
return
elif len(title_text) > 100:
return
# 获取数据库配置
host = CONFIG['Database']['host']
user = CONFIG['Database']['user']
password = CONFIG['Database']['password']
database = CONFIG['Database']['database']
# 判断文章内容是否有违禁词
check_keywords = check_keywords_in_text(title_text)
title = extract_content_until_punctuation(article_text).replace("正文:", "")
print(title)
print(article_text)
from datetime import datetime
# 获取当前时间并格式化
current_time = datetime.now().strftime("%H:%M:%S")
# 打印当前时间
print("当前时间:", current_time)
if ai_service == "dify":
if check_keywords:
print("文章中有违禁词!")
check_link_insert(host, user, password, database, link)
return
# 从配置加载 input_data 模板
input_data_template_str = CONFIG['Dify'].get('input_data_template', '{{"old_article": "{article_text}"}}')
try:
# 解析模板字符串为字典
input_data_template = json.loads(input_data_template_str)
# 使用实际变量格式化模板
input_data = {k: v.format(article_text=article_text) for k, v in input_data_template.items()}
except (json.JSONDecodeError, KeyError, AttributeError) as e:
logger.error(f"处理 Dify input_data 模板时出错: {e}. 使用默认模板.")
input_data = {
"old_article": article_text
}
# input_data = {
# "old_article": article_text
# }
message_content = call_dify_workflow(input_data)
elif ai_service == "coze":
logger.info("coze正在处理")
weijin = ""
if check_keywords:
weijin = "违禁"
# 从配置加载 Coze input_data 模板
input_data_template_str = CONFIG['Coze'].get('input_data_template',
'{{"article": "{article_text}", "link":"{link}", "weijin":"{weijin}"}}')
try:
# 解析模板字符串为字典
input_data_template = json.loads(input_data_template_str)
# 使用实际变量格式化模板
input_data = {k: v.format(article_text=article_text, link=link, weijin=weijin) for k, v in
input_data_template.items()}
except (json.JSONDecodeError, KeyError, AttributeError) as e:
logger.error(f"处理 Coze input_data 模板时出错: {e}. 使用默认模板.")
input_data = {
"article": article_text,
"link": link,
"weijin": weijin
}
msg = call_coze_workflow(input_data)
message_content = msg['article']
result = msg['result']
if result == "已经创作过":
return
# 获取当前时间并格式化
current_time = datetime.now().strftime("%H:%M:%S")
# 打印当前时间
print("当前时间:", current_time)
finally_article = message_content.replace("正文:", "") + "\n"
article_save_path = os.path.join(ARTICLES_BASE_PATH, f"{title}.txt")
if '*' in finally_article or '#' in finally_article or "-" in finally_article:
# 使用正则表达式一次性替换多个字符
old_content = re.sub(r'[*#-]', '', message_content)
else:
# 如果不需要替换,直接使用原内容
old_content = finally_article
print("改写完成的文章:" + old_content)
# 删除AI词汇
content = old_content
check_link_insert(host, user, password, database, link)
# 判断文章合规度
if text_detection(content) == "合规":
print("文章合规")
pass
else:
print("文章不合规")
return
with open(article_save_path, 'w', encoding='utf-8') as f:
f.write(content)
logging.info('文本已经保存')
if img_urls:
download_and_process_images(img_urls, title)
except Exception as e:
logging.error(f"处理链接 {link} 时出错: {e}")
raise
def link_to_text(prompt1=None, prompt2=None, num_threads=None, ai_service="dify"):
use_link_path = 'use_link_path.txt'
# 读取链接
links = read_excel(TITLE_BASE_PATH)
# 过滤已处理的链接
filtered_links = []
host = CONFIG['Database']['host']
user = CONFIG['Database']['user']
password = CONFIG['Database']['password']
database = CONFIG['Database']['database']
for link in links:
logging.info(f"总共{len(links)}个链接")
if check_link_exists(host, user, password, database, link):
logger.info(f"链接已存在: {link}")
continue
else:
filtered_links.append(link)
logger.info(f"链接不存在: {link}")
print("链接不存在,存储到过滤器中:", link)
if not filtered_links:
logger.info("没有新链接需要处理")
return []
# 使用多线程处理链接
results = process_links_with_threads(filtered_links, num_threads, ai_service)
# 记录已处理的链接
with open(use_link_path, 'a+', encoding='utf-8') as f:
for link, success, _ in results:
if success:
f.write(link + "\n")
return results
# 创建一个任务队列和结果队列
task_queue = queue.Queue()
result_queue = queue.Queue()
# 工作线程函数
def worker(ai_service):
while True:
try:
# 从队列中获取任务
link = task_queue.get()
if link is None: # 结束信号
break
# 处理链接
try:
logger.info(f"开始处理链接:{link}")
process_link(link, ai_service)
result_queue.put((link, True, None)) # 成功
except Exception as e:
result_queue.put((link, False, str(e))) # 失败
logger.error(f"处理链接 {link} 时出错: {e}")
# 标记任务完成
task_queue.task_done()
except Exception as e:
logger.error(f"工作线程出错: {e}")
# 多线程处理链接
def process_links_with_threads(links, num_threads=None, ai_service="dify"):
if num_threads is None:
num_threads = min(MAX_THREADS, len(links))
else:
num_threads = min(num_threads, MAX_THREADS, len(links))
# 清空任务队列和结果队列
while not task_queue.empty():
task_queue.get()
while not result_queue.empty():
result_queue.get()
# 创建工作线程
threads = []
# 将AI服务选择传递给worker函数
for _ in range(num_threads):
t = threading.Thread(target=worker, args=(ai_service,))
t.daemon = True
t.start()
threads.append(t)
# 添加任务到队列
for link in links:
task_queue.put(link)
# 添加结束信号
for _ in range(num_threads):
task_queue.put(None)
# 等待所有线程完成
for t in threads:
t.join()
# 处理结果
results = []
while not result_queue.empty():
results.append(result_queue.get())
return results

View File

@ -1,11 +1,20 @@
from get_web_content import wechat_extract_content,toutiao_w_extract_content,toutiao_extract_content import json
import requests
from bs4 import BeautifulSoup
from get_web_content import wechat_extract_content, toutiao_w_extract_content, toutiao_extract_content
from utils import handle_duplicate_files_advanced
title,article,imgs = wechat_extract_content("https://mp.weixin.qq.com/s/3KejJOMuY2y6LA5k1tNwcg") # title,article,imgs = wechat_extract_content("https://mp.weixin.qq.com/s/3KejJOMuY2y6LA5k1tNwcg")
# title,article,imgs = toutiao_extract_content("https://www.toutiao.com/article/7491890368917602825/?log_from=ab01481cf63ba_1744526333347") # title,article,imgs = toutiao_w_extract_content("https://www.toutiao.com/w/1830082267985932/")
# title,article,imgs = toutiao_extract_content("https://www.toutiao.com/article/7496132108239356479/")
# print(imgs)
# print(type(imgs))
print("title:",title)
print("article",article) name = handle_duplicate_files_advanced(r"F:\work\code\python\ArticleReplaceBatch\articles","exeample.txt")
print(name[0])
print("imgs",imgs)

View File

@ -0,0 +1,340 @@
import PySimpleGUI as sg
import json
import os
import random
import re
from docx.shared import Pt, RGBColor
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT, WD_UNDERLINE
from docx.enum.text import WD_ALIGN_PARAGRAPH
from docx.oxml import OxmlElement
from docx.oxml.ns import qn
from docx.enum.style import WD_STYLE_TYPE
from docx import Document
from docx.shared import Inches
from PIL import Image
# 保存文件路径的 JSON 文件
SETTINGS_FILE = 'settings.json'
def set_picture_wrapping(paragraph):
"""
设置图片环绕方式
:param paragraph:
:return:
"""
# 设置图片环绕方式为上下环绕
pPr = paragraph._element.get_or_add_pPr()
framePr = OxmlElement('w:framePr')
framePr.set(qn('w:wrap'), 'around')
framePr.set(qn('w:vAnchor'), 'text')
framePr.set(qn('w:hAnchor'), 'text')
pPr.append(framePr)
def format_word_document(input_filename, output_filename):
# 打开文档
doc = Document(input_filename)
# 创建或更新标题样式
style = doc.styles.add_style('CustomHeading', WD_STYLE_TYPE.PARAGRAPH)
style.font.name = '黑体'
style.font.size = Pt(22) # 二号字
style.font.color.rgb = RGBColor(0, 0, 255) # 蓝色
style.paragraph_format.space_after = Pt(12) # 标题后间距
# 创建或更新正文样式
style = doc.styles.add_style('CustomBody', WD_STYLE_TYPE.PARAGRAPH)
style.font.name = '仿宋'
style.font.size = Pt(14) # 四号字
style.paragraph_format.first_line_indent = Pt(20) # 首行缩进两字符
style.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.LEFT
style.paragraph_format.line_spacing = 1.5 # 行间距
style.paragraph_format.space_before = Pt(6) # 段前间距
style.paragraph_format.space_after = Pt(6) # 段后间距
# 遍历所有段落
for paragraph in doc.paragraphs:
# 设置标题格式
if paragraph.style.name.startswith('Heading'):
paragraph.style = doc.styles['CustomHeading']
# 设置段落格式
else:
paragraph.style = doc.styles['CustomBody']
# 遍历所有图片
for rel in doc.part.rels.values():
if "image" in rel.target_ref:
# 获取图片所在的段落
for paragraph in doc.paragraphs:
for run in paragraph.runs:
if run._element.tag.endswith('}pict'):
# 设置图片居中
paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER
# 设置图片环绕方式为上下环绕
set_picture_wrapping(paragraph)
paragraph.paragraph_format.space_before = Pt(12)
paragraph.paragraph_format.space_after = Pt(12)
# output_filename = remove_book_titles(output_filename)
# 保存文档
doc.save(output_filename)
def crop_and_replace_images(folder_path):
"""
修改图片尺寸
:param folder_path:
:return:
"""
folder_path = folder_path.strip()
# 遍历文件夹中的所有文件
if not os.path.exists(folder_path):
os.mkdir(folder_path)
else:
for filename in os.listdir(folder_path):
if os.path.exists(filename):
# 检查文件扩展名是否为图片格式
if filename.lower().endswith(('.jpg','.png')):
# 拼接完整的文件路径
file_path = os.path.join(folder_path, filename)
print("文件夹路径:" + folder_path)
print("文件路径:" + file_path)
# 打开图片
with Image.open(file_path) as img:
# 获取图片的尺寸
width, height = img.size
# 裁剪图片裁剪下方10px
cropped_img = img.crop((0, 0, width, height - (height * 0.2)))
# 保存裁剪后的图片,覆盖原文件
output_path = file_path[0:file_path.find('.')] + '.png'
cropped_img.save(output_path, 'PNG')
def split_text_into_paragraphs(text):
"""
将文本分割成段落并在每个段落之间加一个空行
:param text: 输入的文本
:return: 段落列表
"""
paragraphs = text.split('\n\n')
# 过滤掉空行和只包含空白字符的段落
paragraphs = list(filter(lambda p: p.strip(), paragraphs))
# 在每个段落之间加一个空行
paragraphs_with_blank_lines = []
for paragraph in paragraphs:
paragraphs_with_blank_lines.append(paragraph)
paragraphs_with_blank_lines.append('')
# 移除最后一个多余的空行
if paragraphs_with_blank_lines:
paragraphs_with_blank_lines.pop()
return paragraphs_with_blank_lines
def insert_images_into_paragraphs(paragraphs, image_folder, doc, title):
"""
将图片插入到段落中
:param paragraphs:
:param image_folder:
:param doc:
:return:
"""
# 获取图片列表并排序
images = sorted([os.path.join(image_folder, img) for img in os.listdir(image_folder) if
img.lower().endswith(('jpg'))])
# images = sorted([os.path.join(image_folder, img) for img in os.listdir(image_folder) if
# # img.lower().endswith(('png', 'jpg', 'jpeg'))])
total_images = len(images)
image_index = 0
for i, paragraph in enumerate(paragraphs):
if "正文:" in paragraph:
paragraph = paragraph.replace("正文:", '')
p = doc.add_paragraph(paragraph)
if os.path.exists(image_folder):
# 插入图片
if image_index < total_images:
img_path = images[image_index]
# 确保图片路径正确且图片文件存在
if os.path.exists(img_path):
try:
with Image.open(img_path) as img:
width, height = img.size
doc.add_picture(img_path, width=Inches(width / height * 1.5))
image_index += 1
except Exception as e:
print(f"无法识别图像: {img_path}, 错误: {e}")
continue
else:
print(f"图片路径无效: {img_path}")
def create_word_document(text, image_folder, output_path, title):
"""
创建Word文档
:param text:
:param image_folder:
:param output_path:
:return:
"""
doc = Document()
paragraphs = split_text_into_paragraphs(text)
insert_images_into_paragraphs(paragraphs, image_folder, doc, title)
# modify_document(doc)
doc.save(output_path)
format_word_document(output_path, output_path)
print(f'文档已保存到: {output_path}')
# 读取指定路径下txt文本的内容
def read_text_file(file_path):
"""
读取指定路径下txt文本的内容
:param file_path:
:return:
"""
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
def get_file_name(file_path):
"""
获取文件名
:param file_path:
:return:
"""
return os.path.basename(file_path)
def apply_random_style(paragraph):
# 预定义字体颜色列表
predefined_font_colors = [
RGBColor(255, 0, 0), # 红色
RGBColor(255, 165, 0), # 橙色
RGBColor(128, 0, 128), # 紫色
]
# 预定义背景颜色列表手动定义RGB颜色避免太亮或太深
predefined_bg_colors = [
RGBColor(240, 240, 240), # 浅灰色
RGBColor(255, 255, 224), # 浅黄色
RGBColor(224, 255, 224), # 浅绿色
RGBColor(224, 255, 255), # 浅青色
RGBColor(255, 228, 225), # 浅粉色
RGBColor(240, 248, 255), # 浅蓝色
]
# 获取段落中的每一个run对象代表一段连续的文字
for run in paragraph.runs:
# 随机选择样式
style_choice = random.choice(['bold', 'italic', 'underline', 'color', 'background'])
if style_choice == 'bold':
run.bold = True
elif style_choice == 'italic':
run.italic = True
elif style_choice == 'underline':
run.underline = WD_UNDERLINE.SINGLE
elif style_choice == 'color':
# 从预定义颜色中随机选择一个颜色
run.font.color.rgb = random.choice(predefined_font_colors)
elif style_choice == 'background':
# 从预定义背景颜色中随机选择一个颜色
run.font.color.highlight_color = random.choice(predefined_bg_colors)
def txt2docx(txt_path, image_path, keep_txt=True):
file_path = txt_path
txts = sorted([os.path.join(file_path, txt) for txt in os.listdir(file_path) if
txt.lower().endswith(('txt'))])
img_path = image_path
for txt in txts:
print("正在修改:" + txt)
text = read_text_file(txt)
# print(text)
txt_name = get_file_name(txt)
title_name = txt_name.replace(".txt", "")
title = title_name
print(title)
if "正文:" in text:
new_text = text.split('正文:')[1].replace("```markdown", "").replace("```", "")
else:
new_text = text.replace("```markdown", "").replace("```", "")
content = new_text
image_folder = img_path + '\\' + txt_name.replace(".txt", "").rstrip(".")
# crop_and_replace_images(image_folder)
create_word_document(content, image_folder, txt.replace(".txt", ".docx"), title_name)
# 根据用户选择决定是否删除原始txt文件
if not keep_txt:
os.remove(txt)
print(f"已删除原始文件: {txt}")
else:
print(f"保留原始文件: {txt}")
# 加载设置
def load_settings():
if os.path.exists(SETTINGS_FILE):
with open(SETTINGS_FILE, 'r') as f:
return json.load(f)
return {'folder1': '', 'folder2': ''}
# 保存设置
def save_settings(settings):
with open(SETTINGS_FILE, 'w') as f:
json.dump(settings, f)
# 自定义函数,用于处理用户选择的文件夹
def process_folders(folder1, folder2, keep_txt=True):
# 在这里添加处理文件夹的代码
txt2docx(folder1, folder2, keep_txt)
# 加载之前的设置
settings = load_settings()
if 'keep_txt' not in settings:
settings['keep_txt'] = True
# 定义窗口的布局
layout = [
[sg.Text('文章文件夹:'), sg.Input(default_text=settings['folder1']), sg.FolderBrowse()],
[sg.Text('图片文件夹:'), sg.Input(default_text=settings['folder2']), sg.FolderBrowse()],
[sg.Checkbox('保留原始txt文件', default=settings['keep_txt'], key='keep_txt')],
[sg.Button('确认'), sg.Button('取消')]
]
# 创建窗口
window = sg.Window('文件夹选择窗口', layout)
# 事件循环
while True:
event, values = window.read()
if event == sg.WIN_CLOSED or event == '取消': # 如果用户关闭窗口或点击取消按钮
break
elif event == '确认': # 如果用户点击确认按钮
folder1 = values[0]
folder2 = values[1]
keep_txt = values['keep_txt']
process_folders(folder1, folder2, keep_txt)
# 保存用户选择的文件夹路径和保留txt文件的选项
settings['folder1'] = folder1
settings['folder2'] = folder2
settings['keep_txt'] = keep_txt
save_settings(settings)
# 关闭窗口
window.close()

View File

@ -99,3 +99,45 @@ def read_excel(file_name):
return first_colunm_data return first_colunm_data
from typing import Tuple
def handle_duplicate_files_advanced(folder_path: str, filename: str) -> Tuple[str, bool]:
"""
增强版处理文件夹中的同名文件支持更复杂的场景
参数:
folder_path: 文件夹路径
filename: 原始文件名
返回:
Tuple[str, bool]: (处理后的文件名, 是否是重命名的)
"""
base, ext = os.path.splitext(filename)
target_path = os.path.join(folder_path, filename)
if not os.path.exists(target_path):
return filename, False
existing_files = set(os.listdir(folder_path))
pattern = re.compile(r'^{}(_(\d+))?{}$'.format(re.escape(base), re.escape(ext)))
# 找出所有匹配的文件并提取数字
numbers = []
for f in existing_files:
match = pattern.match(f)
if match:
num = int(match.group(2)) if match.group(2) else 0
numbers.append(num)
next_num = max(numbers) + 1 if numbers else 1
new_filename = f"{base}_{next_num}{ext}"
# 确保新文件名也不存在(处理并发情况)
while new_filename in existing_files:
next_num += 1
new_filename = f"{base}_{next_num}{ext}"
return new_filename, True