Compare commits

...

No commits in common. "master" and "main" have entirely different histories.
master ... main

21 changed files with 36645 additions and 26 deletions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,38 @@
# -*- mode: python ; coding: utf-8 -*-
a = Analysis(
['ArticleReplaceDifyBatchWTT.py'],
pathex=[],
binaries=[],
datas=[],
hiddenimports=[],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
noarchive=False,
optimize=0,
)
pyz = PYZ(a.pure)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.datas,
[],
name='ArticleReplaceDifyBatchWTT',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
)

View File

@ -0,0 +1,173 @@
import json
import requests
from config import *
# ==========================调用dify工作流===============================================
def call_dify_workflow(input_data):
"""
调用Dify工作流的函数
:param input_data: 传递给工作流的输入数据
:return: 工作流的输出结果
"""
logger.info("Dify开始工作。。。")
api_key = CONFIG['Dify']['api_key']
user_id = CONFIG['Dify']['user_id']
url = CONFIG['Dify']['url']
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json',
}
data = {
"inputs": input_data,
"response_mode": "blocking",
"user": user_id
}
response = requests.post(url, headers=headers, data=json.dumps(data))
json_data = json.loads(response.text)
print("json_data:", json_data)
# 获取article的值
article = json_data['data']['outputs']['article']
# print("article:", article)
return article
# ==========================调用coze工作流==========================
def call_coze_workflow(parameters):
"""
调用 Coze 工作流的函数
:param parameters: 传递给工作流的输入参数字典格式
:return: 工作流的执行结果
"""
logger.info("Coze开始工作。。。。")
workflow_id = CONFIG['Coze']['workflow_id']
access_token = CONFIG['Coze']['access_token']
is_async = CONFIG['Coze']['is_async'].lower() == 'true'
url = "https://api.coze.cn/v1/workflow/run"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
data = {
"workflow_id": workflow_id,
"parameters": parameters,
"is_async": is_async
}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
# data = json.loads(response.text)['data']
# print("data",data['output'])
return response.text
else:
return {
"error": f"请求失败,状态码:{response.status_code}",
"detail": response.text
}
def call_coze_article_workflow(parameters):
"""
调用 Coze 工作流的函数
:param parameters: 传递给工作流的输入参数字典格式
:param is_async: 是否异步执行默认 False
:return: 工作流的执行结果
"""
workflow_id = CONFIG['Coze']['workflow_id']
access_token = CONFIG['Coze']['access_token']
is_async = CONFIG['Coze']['is_async'].lower() == 'true'
url = "https://api.coze.cn/v1/workflow/run"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
data = {
"workflow_id": workflow_id,
"parameters": parameters,
"is_async": is_async
}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
# data = json.loads(response.text)['data']
# print("data",data['output'])
import ast
# 直接解析整个result字符串
result_dict = ast.literal_eval(response.text)
# 解析data字段
data_dict = ast.literal_eval(result_dict['data'])
# 获取output的值
output_value = data_dict['output']
return output_value
else:
return {
"error": f"请求失败,状态码:{response.status_code}",
"detail": response.text
}
def call_coze_all_article_workflow(parameters,is_async=False):
"""
调用 Coze 工作流的函数
:param parameters: 传递给工作流的输入参数字典格式
:param is_async: 是否异步执行默认 False
:return: 工作流的执行结果
"""
workflow_id = CONFIG['Coze']['workflow_id']
access_token = CONFIG['Coze']['access_token']
is_async = CONFIG['Coze']['is_async'].lower() == 'False'
url = "https://api.coze.cn/v1/workflow/run"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
data = {
"workflow_id": workflow_id,
"parameters": parameters,
"is_async": is_async
}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
# data = json.loads(response.text)['data']
# print("data",data['output'])
import ast
# 直接解析整个result字符串
result_dict = ast.literal_eval(response.text)
print(result_dict)
# 解析data字段
data_dict = ast.literal_eval(result_dict['data'])
# 获取output的值
title = data_dict['title']
article = data_dict['article']
return title, article
else:
return {
"error": f"请求失败,状态码:{response.status_code}",
"detail": response.text
}

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,52 @@
[General]
chrome_user_dir = C:\Users\taiyi\AppData\Local\Google\Chrome\User Data
articles_path = articles
images_path = picture
title_file = 文章链接.xlsx
max_threads = 10
[Database]
host = 27.106.125.150
user = root
password = taiyi.1224
database = toutiao
[Dify]
api_key = app-87gssUKFBs9BwJw4m95uUcyF
user_id = toutiao
url = http://27.106.125.150/v1/workflows/run
input_data_template = {"old_article": "{article_text}"}
[Baidu]
api_key = 6GvuZoSEe4L8I7O3p7tZRKhj
secret_key = jDujU3MyzP34cUuTP0GNtPejlQpUFWvl
[ImageModify]
crop_percent = 0.02
min_rotation = 0.02
max_rotation = 0.03
min_brightness = 0.95
max_brightness = 1.09
watermark_text =
watermark_opacity = 128
overlay_opacity = 1
[Keywords]
banned_words = 珠海,落马,股票,股市,股民,爆炸,火灾,死亡,抢劫,诈骗,习大大,习近平,政府,官员,扫黑,警察,落网,嫌疑人,通报,暴力执法,执法,暴力,气象,天气,暴雨,大雨
[Coze]
workflow_id = 7509764025128845366
access_token = pat_EwqZIrV7Y2DHadDWnqkk0k9YxDUxyjIEAfHZFAfnT97mUCexhoTl6McQq3O7mmI8
is_async = true
input_data_template = {"article": "{article_text}", "link":"{link}", "weijin":"{weijin}"}
gen_type = 短篇
templates = {"短篇": {"article": "{article_text}", "link":"{link}", "weijin":"{weijin}", "type":"短篇"}, "文章": {"article": "{article_text}", "link":"{link}", "weijin":"{weijin}", "type":"文章"}}
short_templates = {"1": {"workflow_id": "750191704079481242511231123", "access_token": "pat_0DczPLquEPhA3mSqokHTPpU9KNHrM3mz5sZKSWxi7ZeWK1Fi5UjPzQihq1DwCQ91", "is_async": "false", "input_data_template": "{\"title\": \"{title_text}\"}"}}
article_templates = {}
last_used_template = 2. 万能
last_used_template_type = 文章
[Templates]
templates_短篇 = [{"name": "11", "type": "短篇", "workflow_id": "7501917040794812425", "access_token": "pat_0DczPLquEPhA3mSqokHTPpU9KNHrM3mz5sZKSWxi7ZeWK1Fi5UjPzQihq1DwCQ91", "is_async": "false", "input_data_template": "{\"title\": \"{title_text}\"}"}, {"name": "123", "workflow_id": "pat_0DczPLquEPhA3mSqokHTPpU9KNHrM3mz5sZKSWxi7ZeWK1Fi5UjPzQihq1DwCQ91", "access_token": "pat_0DczPLquEPhA3mSqokHTPpU9KNHrM3mz5sZKSWxi7ZeWK1Fi5UjPzQihq1DwCQ91", "is_async": "true", "input_data_template": "{\"article\": \"{article_text}\", \"link\":\"{link}\", \"weijin\":\"{weijin}\"}"}]
templates_文章 = [{"name": "1. 情感", "workflow_id": "7520933385113141298", "access_token": "pat_e6f7xXY1Oi8fPxGnBuUV2ed4M8uEdZ4KL6Ncn4359cP5lL6ARCTJg5bVyE4YZxoL", "is_async": "true", "input_data_template": "{\"article\": \"{article_text}\", \"link\":\"{link}\", \"weijin\":\"{weijin}\"}"}, {"name": "2. 万能", "workflow_id": "7509764025128845366", "access_token": "pat_EwqZIrV7Y2DHadDWnqkk0k9YxDUxyjIEAfHZFAfnT97mUCexhoTl6McQq3O7mmI8", "is_async": "true", "input_data_template": "{\"article\": \"{article_text}\", \"link\":\"{link}\", \"weijin\":\"{weijin}\"}"}, {"name": "3.七天", "workflow_id": "752935212125560838344", "access_token": "pat_nFWBkrbtjdaQaqKsfmfrYxlhTkjni8QdqE23xPN3V2Bn4sbMQQz24pHuqJzEc3Tm", "is_async": "true"}]

View File

@ -0,0 +1,121 @@
import configparser
import getpass
import logging
import os
# 配置文件路径
CONFIG_FILE = "config.ini"
# 默认配置
DEFAULT_CONFIG = {
"General": {
"chrome_user_dir": f"C:\\Users\\{getpass.getuser()}\\AppData\\Local\\Google\\Chrome\\User Data",
"articles_path": "articles",
"images_path": "picture",
"title_file": "文章链接.xlsx",
"max_threads": "3"
},
"Coze": {
"workflow_id": "",
"access_token": "",
"is_async": "false",
"input_data_template": "{\"article\": \"{article_text}\", \"link\":\"{link}\", \"weijin\":\"{weijin}\"}",
"last_used_template": "",
"last_used_template_type": "文章"
},
"Database": {
"host": "27.106.125.150",
"user": "root",
"password": "taiyi.1224",
"database": "toutiao"
},
"Dify": {
"api_key": "app-87gssUKFBs9BwJw4m95uUcyF",
"user_id": "toutiao",
"url": "http://27.106.125.150/v1/workflows/run"
},
"Baidu": {
"api_key": "6GvuZoSEe4L8I7O3p7tZRKhj",
"secret_key": "jDujU3MyzP34cUuTP0GNtPejlQpUFWvl"
},
"ImageModify": {
"crop_percent": "0.02",
"min_rotation": "0.3",
"max_rotation": "3.0",
"min_brightness": "0.8",
"max_brightness": "1.2",
"watermark_text": "Qin Quan Shan Chu",
"watermark_opacity": "128",
"overlay_opacity": "30"
},
"Keywords": {
"banned_words": "珠海,落马,股票,股市,股民,爆炸,火灾,死亡,抢劫,诈骗,习大大,习近平,政府,官员,扫黑,警察,落网,嫌疑人,通报,暴力执法,执法,暴力,气象,天气,暴雨,大雨"
}
}
# 加载配置
def load_config():
config = configparser.ConfigParser()
# 如果配置文件不存在,创建默认配置
if not os.path.exists(CONFIG_FILE):
for section, options in DEFAULT_CONFIG.items():
config[section] = options
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
config.write(f)
else:
config.read(CONFIG_FILE, encoding='utf-8')
# 检查并添加缺失的配置项
for section, options in DEFAULT_CONFIG.items():
if not config.has_section(section):
config[section] = {}
for option, value in options.items():
if not config.has_option(section, option):
config[section][option] = value
# 保存更新后的配置
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
config.write(f)
return config
# 保存配置
def save_config(config):
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
config.write(f)
# 加载配置
CONFIG = load_config()
# 更新全局变量
USER_DIR_PATH = CONFIG['General']['chrome_user_dir']
ARTICLES_BASE_PATH = CONFIG['General']['articles_path']
IMGS_BASE_PATH = CONFIG['General']['images_path']
TITLE_BASE_PATH = CONFIG['General']['title_file']
MAX_THREADS = int(CONFIG['General']['max_threads'])
# 创建必要的目录
if not os.path.exists(ARTICLES_BASE_PATH):
os.makedirs(ARTICLES_BASE_PATH)
os.chmod(ARTICLES_BASE_PATH, 0o777)
if not os.path.exists(IMGS_BASE_PATH):
os.makedirs(IMGS_BASE_PATH)
os.chmod(IMGS_BASE_PATH, 0o777)
# 日志配置
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("article_replace.log", encoding='utf-8'),
logging.StreamHandler()
])
logger = logging.getLogger(__name__)
# 日志文件保存路径
LOG_FILE = "article_replace.log"

View File

@ -0,0 +1,86 @@
import pymysql
# ==============================数据库模块===================================
def check_link_exists(host, user, password, database, link):
"""
检查指定的 link 是否存在于 MySQL 数据库表中如果不存在则插入该链接
:param host: MySQL 数据库主机地址
:param user: MySQL 用户名
:param password: MySQL 密码
:param database: 数据库名称
:param link: 需要检查的链接
:return: 如果链接存在返回 True如果链接不存在且插入成功返回 False
"""
connection = None # 确保 connection 被初始化
try:
# 连接到 MySQL 数据库
connection = pymysql.connect(
host=host,
user=user,
password=password,
database=database
)
with connection.cursor() as cursor:
# 查询链接是否存在
cursor.execute("SELECT 1 FROM links WHERE link = %s", (link,))
result = cursor.fetchone()
# 如果链接存在
if result:
return True
else:
return False
except pymysql.MySQLError as e:
print(f"数据库错误: {e}")
return False
finally:
# 确保在结束时关闭连接
if connection:
connection.close()
def check_link_insert(host, user, password, database, link):
"""
检查指定的 link 是否存在于 MySQL 数据库表中如果不存在则插入该链接
:param host: MySQL 数据库主机地址
:param user: MySQL 用户名
:param password: MySQL 密码
:param database: 数据库名称
:param link: 需要检查的链接
:return: 如果链接存在返回 True如果链接不存在且插入成功返回 False
"""
connection = None # 确保 connection 被初始化
try:
# 连接到 MySQL 数据库
connection = pymysql.connect(
host=host,
user=user,
password=password,
database=database
)
with connection.cursor() as cursor:
# 查询链接是否存在
cursor.execute("SELECT 1 FROM links WHERE link = %s", (link,))
result = cursor.fetchone()
if result:
# 如果链接已经存在,返回 True
return True
else:
# 插入链接
cursor.execute("INSERT INTO links (link) VALUES (%s)", (link,))
connection.commit() # 提交事务
print("链接已插入")
return False
except pymysql.MySQLError as e:
print(f"数据库错误: {e}")
return False
finally:
# 确保在结束时关闭连接
if connection:
connection.close()

View File

@ -0,0 +1,341 @@
import logging
import os
import random
import requests
from PIL import Image
from PIL import ImageDraw, ImageFont, ImageEnhance
from config import *
from utils import safe_open_directory, safe_filename
IMGS_BASE_PATH = CONFIG['General']['images_path']
def crop_and_replace_images(folder_path):
"""
修改图片尺寸
:param folder_path:
:return:
"""
print("开始处理图片。。。。")
# 遍历文件夹中的所有文件
for filename in os.listdir(folder_path):
# 检查文件扩展名是否为图片格式
if filename.lower().endswith(('.jpg')):
# 拼接完整的文件路径
file_path = os.path.join(folder_path, filename)
print("文件夹路径:" + folder_path)
print("文件路径:" + file_path)
# 打开图片
with Image.open(file_path) as img:
# 获取图片的尺寸
width, height = img.size
# 裁剪图片裁剪下方10px
print("裁剪图片。。。")
cropped_img = img.crop((0, 0, width, height - (height * 0.1)))
# 保存裁剪后的图片,覆盖原文件
# 通过拉伸使改变裁剪后图片的尺寸与原图片尺寸相同
resized_img = cropped_img.resize((width, height))
# output_path = file_path[0:file_path.find('.')] + '.png'
resized_img.save(file_path, 'jpg')
def deduplicate_images(folder_path):
print("开始对图片去重。。。")
"""扫描 folder_path 下的图片,对每张图片做修改并直接覆盖原文件"""
if not os.path.exists(folder_path):
print("错误:输入文件夹不存在!")
return
supported_ext = ('.png', '.jpg', '.jpeg', '.bmp', '.gif', '.webp')
for root, _, files in os.walk(folder_path):
for file in files:
if file.lower().endswith(supported_ext):
file_path = os.path.join(root, file)
try:
with Image.open(file_path) as img:
modified_img = modify_image(img)
modified_img.save(file_path) # 直接覆盖原图片
print(f"已处理并覆盖:{file_path}")
except Exception as e:
print(f"处理 {file_path} 时出错:{e}")
def download_image(image_url, save_path):
"""
下载图片并保存
:param image_url: 图片链接
:param save_path: 保存路径
:return:
"""
try:
response = requests.get(image_url)
if response.status_code == 200:
with open(save_path, 'wb') as f:
f.write(response.content)
print(f"图片下载成功,保存路径为:{save_path}")
else:
print(f"图片下载失败,状态码为:{response.status_code}")
except requests.exceptions.RequestException as e:
print(f"请求出错:{e}")
def download_and_process_images(img_urls, article_title, save_dir=None):
"""
下载并处理图片
:param img_urls: 图片URL列表
:param article_title: 文章标题
:param save_dir: 自定义保存目录如果为None则使用默认目录
"""
if save_dir is None:
save_dir = IMGS_BASE_PATH
# 使用safe_filename处理文章标题
safe_title = safe_filename(article_title)
# 使用os.path.normpath来规范化路径避免路径分隔符的问题
img_dir_path = os.path.normpath(os.path.join(str(save_dir), safe_title))
logger.info(f"图片保存路径:{img_dir_path}")
safe_open_directory(img_dir_path)
for i, img_url in enumerate(img_urls):
if img_url.startswith("https"):
imgurl = img_url
else:
imgurl = "https:" + img_url
# 使用os.path.normpath来规范化图片路径
img_path = os.path.normpath(os.path.join(img_dir_path, f"图片{i}.jpg"))
try:
download_image(imgurl, img_path)
# 只处理当前下载的图片,而不是整个文件夹
with Image.open(img_path) as img:
modified_img = modify_image(img)
modified_img.save(img_path) # 直接覆盖原图片
print(f"已处理并覆盖:{img_path}")
except Exception as e:
logging.error(f"处理图片失败: {e}")
# def download_and_process_images(img_urls, article_title, save_dir=None):
# """
# 下载并处理图片
# :param img_urls: 图片URL列表
# :param article_title: 文章标题
# :param save_dir: 自定义保存目录如果为None则使用默认目录
# """
# if save_dir is None:
# save_dir = IMGS_BASE_PATH
#
# img_dir_path = os.path.join(str(save_dir), str(article_title))
# logger.info(f"图片保存路径:{img_dir_path}")
# safe_open_directory(img_dir_path)
#
# for i, img_url in enumerate(img_urls):
# if img_url.startswith("https"):
# imgurl = img_url
# else:
# imgurl = "https:"+img_url
# img_path = os.path.join(img_dir_path, f"图片{i}.jpg")
# try:
# download_image(imgurl, img_path)
# # crop_and_replace_images(img_dir_path)
# deduplicate_images(img_dir_path)
# except Exception as e:
# logging.error(f"处理图片失败: {e}")
# def modify_image(img):
# print("修改图片")
# """对图片应用去重处理,不翻转,仅裁剪、旋转、亮度调整、添加水印、加透明蒙版"""
# width, height = img.size
#
# # 从配置中获取参数
# crop_percent = float(CONFIG['ImageModify']['crop_percent'])
# min_rotation = float(CONFIG['ImageModify']['min_rotation'])
# max_rotation = float(CONFIG['ImageModify']['max_rotation'])
# min_brightness = float(CONFIG['ImageModify']['min_brightness'])
# max_brightness = float(CONFIG['ImageModify']['max_brightness'])
# watermark_text = CONFIG['ImageModify']['watermark_text']
# watermark_opacity = int(CONFIG['ImageModify']['watermark_opacity'])
# overlay_opacity = int(CONFIG['ImageModify']['overlay_opacity'])
#
# # 1. 裁剪边缘
# crop_px_w = int(width * crop_percent)
# crop_px_h = int(height * crop_percent)
# img = img.crop((crop_px_w, crop_px_h, width - crop_px_w, height - crop_px_h))
#
# # 2. 随机旋转
# angle = random.uniform(min_rotation, max_rotation) * random.choice([-1, 1])
# img = img.rotate(angle, expand=True)
#
# # 3. 调整亮度
# enhancer = ImageEnhance.Brightness(img)
# factor = random.uniform(min_brightness, max_brightness) # 亮度调整因子
# img = enhancer.enhance(factor)
#
# # 4. 添加文字水印
# draw = ImageDraw.Draw(img)
# font_size = max(20, int(min(img.size) * 0.05))
# try:
# font = ImageFont.truetype("arial.ttf", font_size)
# except:
# font = ImageFont.load_default()
#
# # 获取文本尺寸
# text_width, text_height = draw.textbbox((0, 0), watermark_text, font=font)[2:]
#
# # 水印放在图片右下角
# x = img.size[0] - text_width - 5
# y = img.size[1] - text_height - 5
# draw.text((x, y), watermark_text, font=font, fill=(255, 255, 255, watermark_opacity))
#
# # 5. 添加半透明蒙版
# overlay = Image.new('RGBA', img.size, (255, 255, 255, overlay_opacity))
# if img.mode != 'RGBA':
# img = img.convert('RGBA')
# img = Image.alpha_composite(img, overlay)
#
# return img.convert('RGB')
def modify_image(img):
"""
对图片应用去重处理不翻转仅裁剪旋转亮度调整添加水印加透明蒙版
参数:
img: PIL.Image对象要处理的图片
返回:
PIL.Image对象处理后的图片
"""
print("修改图片")
# 确保图片是RGB模式
if img.mode != 'RGB':
img = img.convert('RGB')
# 从配置中获取参数
config = CONFIG['ImageModify']
crop_percent = float(config['crop_percent'])
min_rotation = float(config['min_rotation'])
max_rotation = float(config['max_rotation'])
min_brightness = float(config['min_brightness'])
max_brightness = float(config['max_brightness'])
watermark_text = config['watermark_text']
watermark_opacity = int(config['watermark_opacity'])
overlay_opacity = int(config['overlay_opacity'])
# 1. 新增功能裁剪图片下方20px
img = crop_bottom(img, 20)
# 2. 裁剪边缘
img = crop_edges(img, crop_percent)
# 3. 随机旋转
img = random_rotate(img, min_rotation, max_rotation)
# 4. 调整亮度
img = adjust_brightness(img, min_brightness, max_brightness)
# 5. 添加文字水印
img = add_watermark(img, watermark_text, watermark_opacity)
# 6. 添加半透明蒙版
img = add_overlay(img, overlay_opacity)
# 返回RGB模式的图片
return img.convert('RGB')
def crop_bottom(img, pixels):
"""
裁剪图片底部指定像素
参数:
img: PIL.Image对象要裁剪的图片
pixels: int要裁剪的像素数
返回:
PIL.Image对象裁剪后的图片
"""
width, height = img.size
if height > pixels: # 确保图片高度大于要裁剪的像素
return img.crop((0, 0, width, height - pixels))
return img
def crop_edges(img, percent):
"""
按比例裁剪图片边缘
参数:
img: PIL.Image对象要裁剪的图片
percent: float裁剪比例0-1之间
返回:
PIL.Image对象裁剪后的图片
"""
width, height = img.size
crop_px_w = int(width * percent)
crop_px_h = int(height * percent)
return img.crop((crop_px_w, crop_px_h, width - crop_px_w, height - crop_px_h))
def random_rotate(img, min_rotation, max_rotation):
"""
随机旋转图片
参数:
img: PIL.Image对象要旋转的图片
min_rotation: float最小旋转角度
max_rotation: float最大旋转角度
返回:
PIL.Image对象旋转后的图片
"""
angle = random.uniform(min_rotation, max_rotation) * random.choice([-1, 1])
return img.rotate(angle, expand=True)
def adjust_brightness(img, min_brightness, max_brightness):
"""
调整图片亮度
参数:
img: PIL.Image对象要调整亮度的图片
min_brightness: float最小亮度因子
max_brightness: float最大亮度因子
返回:
PIL.Image对象调整亮度后的图片
"""
enhancer = ImageEnhance.Brightness(img)
factor = random.uniform(min_brightness, max_brightness)
return enhancer.enhance(factor)
def add_watermark(img, text, opacity):
"""
添加文字水印到图片右下角
参数:
img: PIL.Image对象要添加水印的图片
text: str水印文本
opacity: int水印透明度0-255
返回:
PIL.Image对象添加水印后的图片
"""
# 确保图片是RGBA模式以支持透明度
if img.mode != 'RGBA':
img = img.convert('RGBA')
draw = ImageDraw.Draw(img)
font_size = max(20, int(min(img.size) * 0.05))
try:
font = ImageFont.truetype("arial.ttf", font_size)
except:
font = ImageFont.load_default()
# 获取文本尺寸
text_width, text_height = draw.textbbox((0, 0), text, font=font)[2:]
# 确保水印不超出图片边界
x = max(5, img.size[0] - text_width - 5)
y = max(5, img.size[1] - text_height - 5)
# 添加水印
draw.text((x, y), text, font=font, fill=(255, 255, 255, opacity))
return img
def add_overlay(img, opacity):
"""
添加半透明蒙版
参数:
img: PIL.Image对象要添加蒙版的图片
opacity: int蒙版透明度0-255
返回:
PIL.Image对象添加蒙版后的图片
"""
# 确保图片是RGBA模式以支持透明度
if img.mode != 'RGBA':
img = img.convert('RGBA')
overlay = Image.new('RGBA', img.size, (255, 255, 255, opacity))
return Image.alpha_composite(img, overlay)

View File

@ -0,0 +1,253 @@
import threading
import queue
import json # 导入 json 模块
from ai_studio import call_dify_workflow,call_coze_article_workflow,call_coze_all_article_workflow
from databases import *
from images_edit import download_and_process_images
from utils import *
from get_web_content import *
from config import *
# ==============================主程序===========================
def process_link(link_info, ai_service, current_template=None,generation_type=None):
link, article_type = link_info # 解包链接和类型信息
try:
if link.startswith("https://www.toutiao.com"):
title_text, article_text, img_urls = toutiao_w_extract_content(link)
if title_text == "":
title_text, article_text, img_urls = toutiao_extract_content(link)
elif link.startswith("https://mp.weixin.qq.co"):
title_text, article_text, img_urls = wechat_extract_content(link)
elif link.startswith("https://www.163.com"):
title_text, article_text, img_urls = wangyi_extract_content(link)
else:
title_text, article_text, img_urls = "", "", []
if title_text == "":
return
elif len(title_text) > 100:
return
# 获取数据库配置
host = CONFIG['Database']['host']
user = CONFIG['Database']['user']
password = CONFIG['Database']['password']
database = CONFIG['Database']['database']
# 判断文章内容是否有违禁词
check_keywords = check_keywords_in_text(title_text)
title = extract_content_until_punctuation(article_text).replace("正文:", "")
from datetime import datetime
# 获取当前时间并格式化
current_time = datetime.now().strftime("%H:%M:%S")
# 打印当前时间
print("当前时间:", current_time)
if ai_service == "dify":
if check_keywords:
print("文章中有违禁词!")
check_link_insert(host, user, password, database, link)
return
input_data_template_str = CONFIG['Dify'].get('input_data_template', '{"old_article": "{article_text}"}')
try:
input_data_template = json.loads(input_data_template_str)
input_data = {k: v.format(article_text=article_text) for k, v in input_data_template.items()}
except (json.JSONDecodeError, KeyError, AttributeError) as e:
logger.error(f"处理 Dify input_data 模板时出错: {e}. 使用默认模板.")
input_data = {"old_article": article_text}
message_content = call_dify_workflow(input_data)
elif ai_service == "coze":
logger.info("coze正在处理")
logger.info(f"正在处理的文章类型为:{generation_type}")
if current_template:
original_config = {
'workflow_id': CONFIG['Coze']['workflow_id'],
'access_token': CONFIG['Coze']['access_token'],
'is_async': CONFIG['Coze']['is_async']
}
CONFIG['Coze']['workflow_id'] = current_template.get('workflow_id', '')
CONFIG['Coze']['access_token'] = current_template.get('access_token', '')
CONFIG['Coze']['is_async'] = current_template.get('is_async', 'true')
logger.info(f"应用模板配置: {current_template.get('name')}")
logger.info(f"Workflow ID: {CONFIG['Coze']['workflow_id']}")
logger.info(f"Access Token: {'*' * len(CONFIG['Coze']['access_token'])}")
logger.info(f"Is Async: {CONFIG['Coze']['is_async']}")
try:
input_data_template_str = CONFIG['Coze'].get('input_data_template')
input_data_template = json.loads(input_data_template_str)
if generation_type == "短篇":
input_data = {"article": article_text}
print("coze中输入", input_data)
message_content = call_coze_article_workflow(input_data)
elif generation_type == "文章":
print("原文中标题为:", title_text)
print("原文中内容为:", article_text)
input_data = {"title": title_text, "article": article_text}
print("发送的请求数据为:", input_data)
title, message_content = call_coze_all_article_workflow(input_data)
finally:
if 'original_config' in locals():
CONFIG['Coze'].update(original_config)
# 去除标题首尾的空格
title_text = title_text.strip()
# 创建类型目录
type_dir = os.path.join(ARTICLES_BASE_PATH, article_type)
safe_open_directory(type_dir)
# 在类型目录下保存文章
file_name = ""
if generation_type == '短篇':
file_name = handle_duplicate_files_advanced(type_dir, title_text.strip())[0]
elif generation_type == "文章":
file_name = handle_duplicate_files_advanced(type_dir, title.strip())[0]
article_save_path = os.path.join(type_dir, f"{file_name}.txt")
# 判断文章合规度
if text_detection(message_content) == "合规":
print("文章合规")
pass
else:
print("文章不合规")
return
with open(article_save_path, 'w', encoding='utf-8') as f:
f.write(message_content)
logging.info('文本已经保存')
if img_urls:
# 在类型目录下创建图片目录
type_picture_dir = os.path.join(IMGS_BASE_PATH, article_type)
safe_open_directory(type_picture_dir)
# 确保文件名没有多余空格
download_and_process_images(img_urls, file_name.strip(), type_picture_dir)
except Exception as e:
logging.error(f"处理链接 {link} 时出错: {e}")
raise
def link_to_text(num_threads=None, ai_service="dify", current_template=None, generation_type=None):
use_link_path = 'use_link_path.txt'
# 读取链接
links = read_excel(TITLE_BASE_PATH)
# 过滤已处理的链接
filtered_links = []
host = CONFIG['Database']['host']
user = CONFIG['Database']['user']
password = CONFIG['Database']['password']
database = CONFIG['Database']['database']
for link_info in links:
link = link_info[0].strip() # 获取链接并去除空白字符
# 如果Excel中有类型使用Excel中的类型否则使用传入的generation_type
article_type = link_info[1].strip() if len(link_info) > 1 and link_info[1].strip() else generation_type
logging.info(f"总共{len(links)}个链接")
# if check_link_exists(host, user, password, database, link):
# logger.info(f"链接已存在: {link}")
# continue
# else:
filtered_links.append((link, article_type)) # 保存链接和类型的元组
# logger.info(f"链接不存在: {link}")
# print("链接不存在,存储到过滤器中:", link)
if not filtered_links:
logger.info("没有新链接需要处理")
return []
# 使用多线程处理链接
results = process_links_with_threads(filtered_links, num_threads, ai_service, current_template,generation_type)
# 记录已处理的链接
with open(use_link_path, 'a+', encoding='utf-8') as f:
for link, success, _ in results:
if success:
f.write(link + "\n")
return results
# 创建一个任务队列和结果队列
task_queue = queue.Queue()
result_queue = queue.Queue()
# 工作线程函数
def worker(ai_service, current_template=None,generation_type=None):
while True:
try:
# 从队列中获取任务
link = task_queue.get()
if link is None: # 结束信号
break
# 处理链接
try:
logger.info(f"开始处理链接:{link}")
process_link(link, ai_service, current_template,generation_type)
result_queue.put((link, True, None)) # 成功
except Exception as e:
result_queue.put((link, False, str(e))) # 失败
logger.error(f"处理链接 {link} 时出错: {e}")
# 标记任务完成
task_queue.task_done()
except Exception as e:
logger.error(f"工作线程出错: {e}")
# 多线程处理链接
def process_links_with_threads(links, num_threads=None, ai_service="dify", current_template=None,generation_type=None):
if num_threads is None:
num_threads = min(MAX_THREADS, len(links))
else:
num_threads = min(num_threads, MAX_THREADS, len(links))
# 清空任务队列和结果队列
while not task_queue.empty():
task_queue.get()
while not result_queue.empty():
result_queue.get()
# 创建工作线程
threads = []
# 将AI服务选择和模板配置传递给worker函数
for _ in range(num_threads):
t = threading.Thread(target=worker, args=(ai_service, current_template,generation_type))
t.daemon = True
t.start()
threads.append(t)
# 添加任务到队列
for link in links:
task_queue.put(link)
# 添加结束信号
for _ in range(num_threads):
task_queue.put(None)
# 等待所有线程完成
for t in threads:
t.join()
# 处理结果
results = []
while not result_queue.empty():
results.append(result_queue.get())
return results

Binary file not shown.

View File

@ -0,0 +1,433 @@
import re
import random
import argparse
import sys
import os
from typing import List, Tuple, Optional, Dict, Any
from pathlib import Path
import logging
class TextProcessor:
"""文本处理器类,支持句子拆分和字符交换"""
def __init__(self, min_length: int = 30, custom_punctuation: Optional[str] = None):
"""
初始化文本处理器
Args:
min_length: 句子长度阈值
custom_punctuation: 自定义标点符号如果为None则使用默认标点
"""
self.min_length = min_length
self.sentence_endings = custom_punctuation or r'[?!;]'
self.statistics = {
'total_sentences': 0,
'processed_sentences': 0,
'total_chars': 0,
'swapped_chars': 0
}
# 设置日志
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
self.logger = logging.getLogger(__name__)
def split_sentences(self, text: str) -> List[Tuple[str, str]]:
"""
按标点符号拆分句子保留标点符号
Args:
text: 输入文本
Returns:
List[Tuple[str, str]]: 每个元组包含 (句子内容, 标点符号)
"""
if not text.strip():
return []
# 使用正则表达式拆分,保留分隔符
parts = re.split(f'({self.sentence_endings})', text)
sentences = []
i = 0
while i < len(parts):
content = parts[i].strip()
if content: # 非空内容
# 检查下一个部分是否是标点符号
if i + 1 < len(parts) and re.match(self.sentence_endings, parts[i + 1]):
punctuation = parts[i + 1]
i += 2
else:
punctuation = ''
i += 1
sentences.append((content, punctuation))
self.statistics['total_sentences'] += 1
else:
i += 1
return sentences
def swap_random_chars(self, sentence: str) -> str:
"""
对超长句子随机交换相邻两个字符的顺序
Args:
sentence: 输入句子
Returns:
str: 处理后的句子
"""
# 边界情况处理
if not sentence or len(sentence) <= self.min_length or len(sentence) <= 3:
return sentence
# 转换为字符列表便于操作
chars = list(sentence)
original_length = len(chars)
# 确定可交换的范围(避开首尾字符,且需要成对相邻)
# 对于长度为n的句子可交换的相邻对位置为(1,2), (2,3), ..., (n-3,n-2)
start_idx = 1
end_idx = len(chars) - 3 # 最后一个可交换对的起始位置
if end_idx < start_idx:
return sentence
try:
# 随机选择一个相邻对的起始位置
swap_start = random.randint(start_idx, end_idx)
swap_end = swap_start + 1
# 交换相邻的两个字符
chars[swap_start], chars[swap_end] = chars[swap_end], chars[swap_start]
# 更新统计信息
self.statistics['processed_sentences'] += 1
self.statistics['swapped_chars'] += 2
self.logger.debug(f"交换相邻位置 {swap_start}{swap_end},句子长度:{original_length}")
except (ValueError, IndexError) as e:
self.logger.warning(f"字符交换失败:{e}")
return sentence
return ''.join(chars)
def process_text(self, text: str) -> str:
"""
处理文本拆分句子并对超长句子进行字符交换
Args:
text: 输入文本
Returns:
str: 处理后的文本
"""
if not text:
return text
# 重置统计信息
self.statistics = {
'total_sentences': 0,
'processed_sentences': 0,
'total_chars': len(text),
'swapped_chars': 0
}
# 按段落分割
paragraphs = text.split('\n')
processed_paragraphs = []
for paragraph in paragraphs:
if not paragraph.strip():
processed_paragraphs.append(paragraph)
continue
# 拆分句子
sentences = self.split_sentences(paragraph)
# 处理每个句子
processed_sentences = []
for sentence_content, punctuation in sentences:
# 对句子内容进行字符交换
processed_content = self.swap_random_chars(sentence_content)
processed_sentences.append(processed_content + punctuation)
# 重新组合句子
processed_paragraph = ''.join(processed_sentences)
processed_paragraphs.append(processed_paragraph)
return '\n'.join(processed_paragraphs)
def get_statistics(self) -> Dict[str, Any]:
"""获取处理统计信息"""
return self.statistics.copy()
def print_statistics(self):
"""打印处理统计信息"""
stats = self.get_statistics()
print("\n" + "=" * 50)
print("处理统计信息:")
print(f"总字符数:{stats['total_chars']}")
print(f"总句子数:{stats['total_sentences']}")
print(f"处理句子数:{stats['processed_sentences']}")
print(f"交换字符数:{stats['swapped_chars']}")
if stats['total_sentences'] > 0:
print(f"处理率:{stats['processed_sentences'] / stats['total_sentences'] * 100:.1f}%")
print("=" * 50)
class FileHandler:
"""文件处理器,负责文件的读写操作"""
@staticmethod
def read_file(filename: str) -> str:
"""
读取文件内容支持多种编码
Args:
filename: 文件路径
Returns:
str: 文件内容
Raises:
FileNotFoundError: 文件不存在
PermissionError: 权限不足
UnicodeDecodeError: 编码错误
"""
if not os.path.exists(filename):
raise FileNotFoundError(f"文件 '{filename}' 不存在")
if not os.access(filename, os.R_OK):
raise PermissionError(f"没有读取文件 '{filename}' 的权限")
# 尝试多种编码格式
encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1']
for encoding in encodings:
try:
with open(filename, 'r', encoding=encoding) as f:
content = f.read()
logging.info(f"使用 {encoding} 编码成功读取文件:{filename}")
return content
except UnicodeDecodeError:
continue
raise UnicodeDecodeError(f"无法解码文件 '{filename}',尝试的编码格式:{encodings}")
@staticmethod
def write_file(filename: str, content: str, encoding: str = 'utf-8') -> None:
"""
写入文件内容
Args:
filename: 输出文件路径
content: 要写入的内容
encoding: 编码格式
Raises:
PermissionError: 权限不足
OSError: 磁盘空间不足等系统错误
"""
# 确保目录存在
output_dir = os.path.dirname(filename)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
try:
with open(filename, 'w', encoding=encoding) as f:
f.write(content)
logging.info(f"成功写入文件:{filename}")
except PermissionError:
raise PermissionError(f"没有写入文件 '{filename}' 的权限")
except OSError as e:
raise OSError(f"写入文件 '{filename}' 时发生错误:{e}")
def setup_argument_parser() -> argparse.ArgumentParser:
"""设置命令行参数解析器"""
parser = argparse.ArgumentParser(
description='文本句子字符交换处理器',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
使用示例
%(prog)s -f input.txt # 处理文件
%(prog)s -t "你的文本内容" # 直接处理文本
%(prog)s -f input.txt -l 20 # 设置长度阈值为20
%(prog)s -f input.txt -o output.txt # 输出到文件
%(prog)s -f input.txt -p "。!?" -s # 自定义标点符号并显示统计
"""
)
# 输入选项
input_group = parser.add_mutually_exclusive_group(required=True)
input_group.add_argument('-f', '--file', help='输入文件路径')
input_group.add_argument('-t', '--text', help='直接输入文本')
input_group.add_argument('--stdin', action='store_true',
help='从标准输入读取文本')
# 处理选项
parser.add_argument('-l', '--length', type=int, default=30,
help='句子长度阈值默认30')
parser.add_argument('-p', '--punctuation',
help='自定义标点符号(默认:。!?;?!;')
parser.add_argument('-o', '--output', help='输出文件路径')
parser.add_argument('-e', '--encoding', default='utf-8',
help='输出文件编码默认utf-8')
# 其他选项
parser.add_argument('-s', '--statistics', action='store_true',
help='显示处理统计信息')
parser.add_argument('-v', '--verbose', action='store_true',
help='显示详细日志')
parser.add_argument('--seed', type=int, help='随机数种子(用于测试)')
return parser
def main():
"""主函数:处理命令行参数和文本处理"""
parser = setup_argument_parser()
args = parser.parse_args()
# 设置日志级别
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# 设置随机数种子(用于测试)
if args.seed:
random.seed(args.seed)
# 获取输入文本
try:
if args.file:
text = FileHandler.read_file(args.file)
elif args.text:
text = args.text
elif args.stdin:
text = sys.stdin.read()
else:
print("错误:请指定输入源")
sys.exit(1)
if not text.strip():
print("警告:输入文本为空")
sys.exit(0)
except (FileNotFoundError, PermissionError, UnicodeDecodeError) as e:
print(f"错误:{e}")
sys.exit(1)
# 创建处理器并处理文本
try:
processor = TextProcessor(
min_length=args.length,
custom_punctuation=args.punctuation
)
processed_text = processor.process_text(text)
# 输出结果
if args.output:
FileHandler.write_file(args.output, processed_text, args.encoding)
print(f"处理完成,结果已保存到 '{args.output}'")
else:
print("处理结果:")
print("-" * 50)
print(processed_text)
# 显示统计信息
if args.statistics:
processor.print_statistics()
except Exception as e:
print(f"处理过程中发生错误:{e}")
if args.verbose:
import traceback
traceback.print_exc()
sys.exit(1)
# 单元测试
def run_tests():
"""运行基本的单元测试"""
print("运行单元测试...")
# 测试句子拆分
processor = TextProcessor(min_length=6)
# 测试1普通句子拆分
test_text = "这是第一句。这是第二句!第三句?"
sentences = processor.split_sentences(test_text)
assert len(sentences) == 3, f"期望3个句子实际{len(sentences)}"
assert sentences[0] == ("这是第一句", ""), f"第一句解析错误:{sentences[0]}"
# 测试2相邻字符交换
long_sentence = "这是一个很长的句子用来测试字符交换功能"
random.seed(42) # 固定种子以便测试
result = processor.swap_random_chars(long_sentence)
assert result != long_sentence, "长句子应该被修改"
assert len(result) == len(long_sentence), "交换后长度应该不变"
# 验证只交换了相邻的两个字符
diff_count = sum(1 for i, (a, b) in enumerate(zip(long_sentence, result)) if a != b)
assert diff_count == 2, f"应该只有2个字符位置发生变化实际{diff_count}"
# 测试3短句子不变
short_sentence = "短句"
result = processor.swap_random_chars(short_sentence)
assert result == short_sentence, "短句子不应该被修改"
# 测试4边界情况
empty_result = processor.swap_random_chars("")
assert empty_result == "", "空字符串应该保持不变"
print("✓ 所有测试通过!")
# 示例使用
def replace_text(text):
# 检查是否运行测试
if len(sys.argv) > 1 and sys.argv[1] == 'test':
run_tests()
sys.exit(0)
# 命令行模式
if len(sys.argv) > 1:
main()
else:
# 示例演示
sample_text = text
print("示例演示:")
print("原文:")
print(sample_text)
print("\n" + "=" * 50 + "\n")
processor = TextProcessor(min_length=9)
processed = processor.process_text(sample_text)
print("处理后:")
print(processed)
processor.print_statistics()
print("\n使用说明:")
print("命令行用法:")
print(" python script.py -f input.txt # 处理文件")
print(" python script.py -t '你的文本内容' # 直接处理文本")
print(" python script.py -f input.txt -l 20 # 设置长度阈值为20")
print(" python script.py -f input.txt -o output.txt # 输出到文件")
print(" python script.py -f input.txt -p '。!?' -s # 自定义标点符号并显示统计")
print(" python script.py test # 运行单元测试")
return processed
text = """QWERTYUIOP"""
result = replace_text(text)
print(result)

View File

@ -0,0 +1,26 @@
from get_web_content import toutiao_w_extract_content
from images_edit import download_and_process_images
def get_img(url):
title, content, images = toutiao_w_extract_content(url)
print(f"标题: {title}")
print(f"内容长度: {len(content)}")
print(f"图片数量: {len(images)}")
print("图片URLs:")
for i, img_url in enumerate(images, 1):
print(f"{i}. {img_url}")
download_and_process_images(images, "n你好")
# 使用示例
if __name__ == "__main__":
urls = ["https://www.toutiao.com/article/7533210726036275755/"
]
for i in range(len(urls)):
get_img(urls[i])

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,390 @@
import PySimpleGUI as sg
import json
import os
import random
from docx.shared import Pt, RGBColor
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT, WD_UNDERLINE
from docx.enum.text import WD_ALIGN_PARAGRAPH
from docx.oxml import OxmlElement
from docx.oxml.ns import qn
from docx.enum.style import WD_STYLE_TYPE
from docx import Document
from docx.shared import Inches
from PIL import Image
# 保存文件路径的 JSON 文件
SETTINGS_FILE = 'settings.json'
def set_picture_wrapping(paragraph):
"""
设置图片环绕方式
:param paragraph:
:return:
"""
# 设置图片环绕方式为上下环绕
pPr = paragraph._element.get_or_add_pPr()
framePr = OxmlElement('w:framePr')
framePr.set(qn('w:wrap'), 'around')
framePr.set(qn('w:vAnchor'), 'text')
framePr.set(qn('w:hAnchor'), 'text')
pPr.append(framePr)
def format_word_document(input_filename, output_filename):
# 打开文档
doc = Document(input_filename)
# 创建或更新标题样式
style = doc.styles.add_style('CustomHeading', WD_STYLE_TYPE.PARAGRAPH)
style.font.name = '黑体'
style.font.size = Pt(22) # 二号字
style.font.color.rgb = RGBColor(0, 0, 255) # 蓝色
style.paragraph_format.space_after = Pt(12) # 标题后间距
# 创建或更新正文样式
style = doc.styles.add_style('CustomBody', WD_STYLE_TYPE.PARAGRAPH)
style.font.name = '仿宋'
style.font.size = Pt(14) # 四号字
style.paragraph_format.first_line_indent = Pt(20) # 首行缩进两字符
style.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.LEFT
style.paragraph_format.line_spacing = 1.5 # 行间距
style.paragraph_format.space_before = Pt(6) # 段前间距
style.paragraph_format.space_after = Pt(6) # 段后间距
# 遍历所有段落
for paragraph in doc.paragraphs:
# 设置标题格式
if paragraph.style.name.startswith('Heading'):
paragraph.style = doc.styles['CustomHeading']
# 设置段落格式
else:
paragraph.style = doc.styles['CustomBody']
# 遍历所有图片
for rel in doc.part.rels.values():
if "image" in rel.target_ref:
# 获取图片所在的段落
for paragraph in doc.paragraphs:
for run in paragraph.runs:
if run._element.tag.endswith('}pict'):
# 设置图片居中
paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER
# 设置图片环绕方式为上下环绕
set_picture_wrapping(paragraph)
paragraph.paragraph_format.space_before = Pt(12)
paragraph.paragraph_format.space_after = Pt(12)
# output_filename = remove_book_titles(output_filename)
# 保存文档
doc.save(output_filename)
def crop_and_replace_images(folder_path):
"""
修改图片尺寸
:param folder_path:
:return:
"""
folder_path = folder_path.strip()
# 遍历文件夹中的所有文件
if not os.path.exists(folder_path):
os.mkdir(folder_path)
else:
for filename in os.listdir(folder_path):
if os.path.exists(filename):
# 检查文件扩展名是否为图片格式
if filename.lower().endswith(('.jpg','.png')):
# 拼接完整的文件路径
file_path = os.path.join(folder_path, filename)
print("文件夹路径:" + folder_path)
print("文件路径:" + file_path)
# 打开图片
with Image.open(file_path) as img:
# 获取图片的尺寸
width, height = img.size
# 裁剪图片裁剪下方10px
cropped_img = img.crop((0, 0, width, height - (height * 0.2)))
# 保存裁剪后的图片,覆盖原文件
output_path = file_path[0:file_path.find('.')] + '.png'
cropped_img.save(output_path, 'PNG')
def split_text_into_paragraphs(text):
"""
将文本分割成段落并在每个段落之间加一个空行
:param text: 输入的文本
:return: 段落列表
"""
paragraphs = text.split('\n\n')
# 过滤掉空行和只包含空白字符的段落
paragraphs = list(filter(lambda p: p.strip(), paragraphs))
# 在每个段落之间加一个空行
paragraphs_with_blank_lines = []
for paragraph in paragraphs:
paragraphs_with_blank_lines.append(paragraph)
paragraphs_with_blank_lines.append('')
# 移除最后一个多余的空行
if paragraphs_with_blank_lines:
paragraphs_with_blank_lines.pop()
return paragraphs_with_blank_lines
def insert_images_into_paragraphs(paragraphs, image_folder, doc, title):
"""
将图片插入到段落中
:param paragraphs:
:param image_folder:
:param doc:
:return:
"""
if os.path.exists(image_folder):
images = sorted([os.path.join(image_folder, img) for img in os.listdir(image_folder) if
img.lower().endswith(('jpg'))])
else:
images = []
# 获取图片列表并排序
# images = sorted([os.path.join(image_folder, img) for img in os.listdir(image_folder) if
# img.lower().endswith(('jpg'))])
# images = sorted([os.path.join(image_folder, img) for img in os.listdir(image_folder) if
# # img.lower().endswith(('png', 'jpg', 'jpeg'))])
total_images = len(images)
image_index = 0
for i, paragraph in enumerate(paragraphs):
if "正文:" in paragraph:
paragraph = paragraph.replace("正文:", '')
p = doc.add_paragraph(paragraph)
if os.path.exists(image_folder):
# 插入图片
if image_index < total_images:
img_path = images[image_index]
# 确保图片路径正确且图片文件存在
if os.path.exists(img_path):
try:
with Image.open(img_path) as img:
width, height = img.size
doc.add_picture(img_path, width=Inches(width / height * 1.5))
image_index += 1
except Exception as e:
print(f"无法识别图像: {img_path}, 错误: {e}")
continue
else:
print(f"图片路径无效: {img_path}")
def create_word_document(text, image_folder, output_path, title):
"""
创建Word文档
:param text:
:param image_folder:
:param output_path:
:return:
"""
try:
doc = Document()
paragraphs = split_text_into_paragraphs(text)
insert_images_into_paragraphs(paragraphs, image_folder, doc, title)
# modify_document(doc)
doc.save(output_path)
try:
format_word_document(output_path, output_path)
except Exception as e:
print(f"格式化文档 {output_path} 时出错: {e}")
print(f'文档已保存到: {output_path}')
except Exception as e:
print(f"创建文档 {output_path} 时出错: {e}")
# 读取指定路径下txt文本的内容
def read_text_file(file_path):
"""
读取指定路径下txt文本的内容
:param file_path:
:return:
"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
except Exception as e:
print(f"读取文件 {file_path} 时出错: {e}")
return ""
def get_file_name(file_path):
"""
获取文件名
:param file_path:
:return:
"""
return os.path.basename(file_path)
def apply_random_style(paragraph):
# 预定义字体颜色列表
predefined_font_colors = [
RGBColor(255, 0, 0), # 红色
RGBColor(255, 165, 0), # 橙色
RGBColor(128, 0, 128), # 紫色
]
# 预定义背景颜色列表手动定义RGB颜色避免太亮或太深
predefined_bg_colors = [
RGBColor(240, 240, 240), # 浅灰色
RGBColor(255, 255, 224), # 浅黄色
RGBColor(224, 255, 224), # 浅绿色
RGBColor(224, 255, 255), # 浅青色
RGBColor(255, 228, 225), # 浅粉色
RGBColor(240, 248, 255), # 浅蓝色
]
# 获取段落中的每一个run对象代表一段连续的文字
for run in paragraph.runs:
# 随机选择样式
style_choice = random.choice(['bold', 'italic', 'underline', 'color', 'background'])
if style_choice == 'bold':
run.bold = True
elif style_choice == 'italic':
run.italic = True
elif style_choice == 'underline':
run.underline = WD_UNDERLINE.SINGLE
elif style_choice == 'color':
# 从预定义颜色中随机选择一个颜色
run.font.color.rgb = random.choice(predefined_font_colors)
elif style_choice == 'background':
# 从预定义背景颜色中随机选择一个颜色
run.font.color.highlight_color = random.choice(predefined_bg_colors)
def txt2docx(txt_path, image_path, keep_txt=True):
file_path = txt_path
try:
txts = sorted([os.path.join(file_path, txt) for txt in os.listdir(file_path) if
txt.lower().endswith(('txt'))])
except Exception as e:
print(f"读取文件夹 {file_path} 时出错: {e}")
sg.popup_error(f"读取文件夹 {file_path} 时出错: {e}")
return
img_path = image_path
for txt in txts:
try:
print("正在修改:" + txt)
text = read_text_file(txt)
if not text: # 如果读取失败,跳过此文件
print(f"跳过文件: {txt} (读取失败)")
continue
# print(text)
txt_name = get_file_name(txt)
title_name = txt_name.replace(".txt", "")
title = title_name
print(title)
if "正文:" in text:
new_text = text.split('正文:')[1].replace("```markdown", "").replace("```", "")
else:
new_text = text.replace("```markdown", "").replace("```", "")
content = new_text
from pathlib import Path
img_path = Path(img_path)
image_folder = img_path / txt_name.replace(".txt", "").rstrip(".")
# crop_and_replace_images(image_folder)
create_word_document(content, image_folder, txt.replace(".txt", ".docx"), title_name)
# 根据用户选择决定是否删除原始txt文件
if not keep_txt:
try:
os.remove(txt)
print(f"已删除原始文件: {txt}")
except Exception as e:
print(f"删除文件 {txt} 时出错: {e}")
else:
print(f"保留原始文件: {txt}")
except Exception as e:
print(f"处理文件 {txt} 时出错: {e}")
continue # 继续处理下一个文件
# 加载设置
def load_settings():
if os.path.exists(SETTINGS_FILE):
with open(SETTINGS_FILE, 'r') as f:
return json.load(f)
return {'folder1': '', 'folder2': ''}
# 保存设置
def save_settings(settings):
with open(SETTINGS_FILE, 'w') as f:
json.dump(settings, f)
# 自定义函数,用于处理用户选择的文件夹
def process_folders(folder1, folder2, keep_txt=True):
# 检查文件夹是否存在
if not os.path.exists(folder1):
sg.popup_error(f"文章文件夹不存在: {folder1}")
return
if not os.path.exists(folder2):
sg.popup_error(f"图片文件夹不存在: {folder2}")
return
# 在这里添加处理文件夹的代码
try:
txt2docx(folder1, folder2, keep_txt)
sg.popup("处理完成!")
except Exception as e:
sg.popup_error(f"处理过程中出错: {e}")
# 加载之前的设置
settings = load_settings()
if 'keep_txt' not in settings:
settings['keep_txt'] = True
# 定义窗口的布局
layout = [
[sg.Text('文章文件夹:'), sg.Input(default_text=settings['folder1']), sg.FolderBrowse()],
[sg.Text('图片文件夹:'), sg.Input(default_text=settings['folder2']), sg.FolderBrowse()],
[sg.Checkbox('保留原始txt文件', default=settings['keep_txt'], key='keep_txt')],
[sg.Button('确认'), sg.Button('取消')]
]
# 创建窗口
window = sg.Window('文件夹选择窗口', layout)
# 事件循环
while True:
event, values = window.read()
if event == sg.WIN_CLOSED or event == '取消': # 如果用户关闭窗口或点击取消按钮
break
elif event == '确认': # 如果用户点击确认按钮
folder1 = values[0]
folder2 = values[1]
keep_txt = values['keep_txt']
process_folders(folder1, folder2, keep_txt)
# 保存用户选择的文件夹路径和保留txt文件的选项
settings['folder1'] = folder1
settings['folder2'] = folder2
settings['keep_txt'] = keep_txt
save_settings(settings)
# 关闭窗口
window.close()

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,170 @@
import json
import re
import pandas as pd
import requests
from config import *
def text_detection(text):
"""
百度检验文字是否违规
:param text:
:return:
"""
url = "https://aip.baidubce.com/rest/2.0/solution/v1/text_censor/v2/user_defined?access_token=" + get_baidu_access_token()
payload = 'text=' + text
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
content = str(response.text)
data = json.loads(content)
print(data)
conclusion = data['conclusion']
return conclusion
def get_baidu_access_token():
"""
使用 AKSK 生成鉴权签名Access Token百度信息获取
:return: access_token或是None(如果错误)
"""
API_KEY = CONFIG['Baidu']['api_key']
SECRET_KEY = CONFIG['Baidu']['secret_key']
url = "https://aip.baidubce.com/oauth/2.0/token"
params = {"grant_type": "client_credentials", "client_id": API_KEY, "client_secret": SECRET_KEY}
return str(requests.post(url, params=params).json().get("access_token"))
def safe_filename(filename):
"""
处理文件名移除或替换不安全的字符
"""
# 替换Windows文件系统中不允许的字符
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
filename = filename.replace(char, '_')
# 去除首尾空格和点
filename = filename.strip('. ')
# 如果文件名为空,使用默认名称
if not filename:
filename = 'untitled'
return filename
def safe_open_directory(directory_path):
"""
安全创建目录确保路径格式正确并创建所有必要的父目录
"""
try:
# 规范化路径
directory_path = os.path.normpath(directory_path)
if not os.path.exists(directory_path):
os.makedirs(directory_path, exist_ok=True)
os.chmod(directory_path, 0o777)
except Exception as e:
# 打印日志并保存到日志文件中
logging.error(f"创建目录失败: {e}")
raise
def check_keywords_in_text(text):
"""
检查文本中是否包含违禁词
:param text:
:return:
"""
keywords = CONFIG['Keywords']['banned_words'].split(',')
for keyword in keywords:
if keyword.strip() in text:
return True
return False
def extract_content_until_punctuation(text, punctuations=r'[,。!?;]'):
"""
截取一段话中从开始到最近的标点符号的内容
:param text: 输入的文本
:param punctuations: 标点符号的正则表达式模式默认为""""""""""
:return: 截取的内容
"""
# 使用正则表达式查找标点符号的位置
match = re.search(punctuations, text)
if match:
# 如果找到标点符号,截取从开始到标点符号之前的部分
return text[:match.end()].strip()
else:
# 如果没有找到标点符号,返回整个文本
return text.strip()
# 读取Excel表格链接列和类型列的内容并将内容以元组列表的形式返回
def read_excel(file_name):
datas = pd.read_excel(file_name)
first_column_name = datas.columns[0] # 链接列
type_column_name = '领域' # 类型列
links = datas[first_column_name].tolist()
# 如果存在类型列就读取,不存在则为默认类型
types = datas[type_column_name].tolist() if type_column_name in datas.columns else ['默认'] * len(links)
# 将链接和类型组合成元组列表
result = list(zip(links, types))
print(result)
return result
from typing import Tuple
def handle_duplicate_files_advanced(folder_path: str, filename: str) -> Tuple[str, bool]:
"""
增强版处理文件夹中的同名文件支持更复杂的场景
参数:
folder_path: 文件夹路径
filename: 原始文件名
返回:
Tuple[str, bool]: (处理后的文件名, 是否是重命名的)
"""
# 首先处理文件名中的非法字符
filename = safe_filename(filename)
base, ext = os.path.splitext(filename)
target_path = os.path.join(folder_path, filename)
if not os.path.exists(target_path):
return filename, False
existing_files = set(os.listdir(folder_path))
pattern = re.compile(r'^{}(_(\d+))?{}$'.format(re.escape(base), re.escape(ext)))
# 找出所有匹配的文件并提取数字
numbers = []
for f in existing_files:
match = pattern.match(f)
if match:
num = int(match.group(2)) if match.group(2) else 0
numbers.append(num)
next_num = max(numbers) + 1 if numbers else 1
new_filename = f"{base}_{next_num}{ext}"
# 确保新文件名也不存在(处理并发情况)
while new_filename in existing_files:
next_num += 1
new_filename = f"{base}_{next_num}{ext}"
return new_filename, True

24
LICENSE
View File

@ -1,24 +0,0 @@
Software License for MTL
Copyright (c) 2007 The Trustees of Indiana University.
2008 Dresden University of Technology and the Trustees of Indiana University.
2010 SimuNova UG (haftungsbeschränkt), www.simunova.com.
All rights reserved.
Authors: Peter Gottschling and Andrew Lumsdaine
This file is part of the Matrix Template Library
Dresden University of Technology -- short TUD -- and Indiana University -- short IU -- have the exclusive rights to license this product under the following license.
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
1. All redistributions of source code must retain the above copyright notice, the list of authors in the original source code, this list of conditions and the disclaimer listed in this license;
2. All redistributions in binary form must reproduce the above copyright notice, this list of conditions and the disclaimer listed in this license in the documentation and/or other materials provided with the distribution;
3. Any documentation included with all redistributions must include the following acknowledgement:
"This product includes software developed at the University of Notre Dame, the Pervasive Technology Labs at Indiana University, and Dresden University of Technology. For technical information contact Andrew Lumsdaine at the Pervasive Technology Labs at Indiana University. For administrative and license questions contact the Advanced Research and Technology Institute at 1100 Waterway Blvd. Indianapolis, Indiana 46202, phone 317-274-5905, fax 317-274-5902."
Alternatively, this acknowledgement may appear in the software itself, and wherever such third-party acknowledgments normally appear.
4. The name "MTL" shall not be used to endorse or promote products derived from this software without prior written permission from IU or TUD. For written permission, please contact Indiana University Advanced Research & Technology Institute.
5. Products derived from this software may not be called "MTL", nor may "MTL" appear in their name, without prior written permission of Indiana University Advanced Research & Technology Institute.
TUD and IU provide no reassurances that the source code provided does not infringe the patent or any other intellectual property rights of any other entity. TUD and IU disclaim any liability to any recipient for claims brought by any other entity based on infringement of intellectual property rights or otherwise.
LICENSEE UNDERSTANDS THAT SOFTWARE IS PROVIDED "AS IS" FOR WHICH NO WARRANTIES AS TO CAPABILITIES OR ACCURACY ARE MADE. DRESDEN UNIVERSITY OF TECHNOLOGY AND INDIANA UNIVERSITY GIVE NO WARRANTIES AND MAKE NO REPRESENTATION THAT SOFTWARE IS FREE OF INFRINGEMENT OF THIRD PARTY PATENT, COPYRIGHT, OR OTHER PROPRIETARY RIGHTS. DRESDEN UNIVERSITY OF TECHNOLOGY AND INDIANA UNIVERSITY MAKE NO WARRANTIES THAT SOFTWARE IS FREE FROM "BUGS", "VIRUSES", "TROJAN HORSES", "TRAP DOORS", "WORMS", OR OTHER HARMFUL CODE. LICENSEE ASSUMES THE ENTIRE RISK AS TO THE PERFORMANCE OF SOFTWARE AND/OR ASSOCIATED MATERIALS, AND TO THE PERFORMANCE AND VALIDITY OF INFORMATION GENERATED USING SOFTWARE.

View File

@ -1,2 +0,0 @@
# ArticleReplaceBatch

View File

@ -0,0 +1,7 @@
{
"host": "taiyiagi.xyz",
"port": 3306,
"user": "taiyi",
"password": "taiyi1224",
"database": "license_system"
}

13
exeaddpassword/.env Normal file
View File

@ -0,0 +1,13 @@
# 数据库配置
MYSQL_HOST=taiyiagi.xyz
MYSQL_PORT=3306
MYSQL_USER=taiyi
MYSQL_PASSWORD=taiyi1224
MYSQL_DATABASE=exe_licensing
# 加密密钥32字节用于AES-256
ENCRYPTION_KEY=your-32-char-secret-key-here-123456
# 调试模式
DEBUG=false
VERBOSE=false

View File

@ -0,0 +1,4 @@
from flask_sqlalchemy import SQLAlchemy
# 创建数据库实例,独立于应用
db = SQLAlchemy()