Compare commits
No commits in common. "master" and "main" have entirely different histories.
1563
ArticleReplaceBatch/ArticleReplaceDifyBatchWTT.py
Normal file
1563
ArticleReplaceBatch/ArticleReplaceDifyBatchWTT.py
Normal file
File diff suppressed because it is too large
Load Diff
38
ArticleReplaceBatch/ArticleReplaceDifyBatchWTT.spec
Normal file
38
ArticleReplaceBatch/ArticleReplaceDifyBatchWTT.spec
Normal file
@ -0,0 +1,38 @@
|
||||
# -*- mode: python ; coding: utf-8 -*-
|
||||
|
||||
|
||||
a = Analysis(
|
||||
['ArticleReplaceDifyBatchWTT.py'],
|
||||
pathex=[],
|
||||
binaries=[],
|
||||
datas=[],
|
||||
hiddenimports=[],
|
||||
hookspath=[],
|
||||
hooksconfig={},
|
||||
runtime_hooks=[],
|
||||
excludes=[],
|
||||
noarchive=False,
|
||||
optimize=0,
|
||||
)
|
||||
pyz = PYZ(a.pure)
|
||||
|
||||
exe = EXE(
|
||||
pyz,
|
||||
a.scripts,
|
||||
a.binaries,
|
||||
a.datas,
|
||||
[],
|
||||
name='ArticleReplaceDifyBatchWTT',
|
||||
debug=False,
|
||||
bootloader_ignore_signals=False,
|
||||
strip=False,
|
||||
upx=True,
|
||||
upx_exclude=[],
|
||||
runtime_tmpdir=None,
|
||||
console=True,
|
||||
disable_windowed_traceback=False,
|
||||
argv_emulation=False,
|
||||
target_arch=None,
|
||||
codesign_identity=None,
|
||||
entitlements_file=None,
|
||||
)
|
||||
173
ArticleReplaceBatch/ai_studio.py
Normal file
173
ArticleReplaceBatch/ai_studio.py
Normal file
@ -0,0 +1,173 @@
|
||||
import json
|
||||
|
||||
import requests
|
||||
|
||||
from config import *
|
||||
|
||||
|
||||
# ==========================调用dify工作流===============================================
|
||||
def call_dify_workflow(input_data):
|
||||
"""
|
||||
调用Dify工作流的函数。
|
||||
|
||||
:param input_data: 传递给工作流的输入数据
|
||||
:return: 工作流的输出结果
|
||||
"""
|
||||
logger.info("Dify开始工作。。。")
|
||||
api_key = CONFIG['Dify']['api_key']
|
||||
user_id = CONFIG['Dify']['user_id']
|
||||
url = CONFIG['Dify']['url']
|
||||
|
||||
headers = {
|
||||
'Authorization': f'Bearer {api_key}',
|
||||
'Content-Type': 'application/json',
|
||||
}
|
||||
data = {
|
||||
"inputs": input_data,
|
||||
"response_mode": "blocking",
|
||||
"user": user_id
|
||||
}
|
||||
response = requests.post(url, headers=headers, data=json.dumps(data))
|
||||
json_data = json.loads(response.text)
|
||||
print("json_data:", json_data)
|
||||
|
||||
# 获取article的值
|
||||
article = json_data['data']['outputs']['article']
|
||||
# print("article:", article)
|
||||
return article
|
||||
|
||||
|
||||
# ==========================调用coze工作流==========================
|
||||
|
||||
|
||||
def call_coze_workflow(parameters):
|
||||
"""
|
||||
调用 Coze 工作流的函数
|
||||
|
||||
:param parameters: 传递给工作流的输入参数(字典格式)
|
||||
:return: 工作流的执行结果
|
||||
"""
|
||||
logger.info("Coze开始工作。。。。")
|
||||
workflow_id = CONFIG['Coze']['workflow_id']
|
||||
access_token = CONFIG['Coze']['access_token']
|
||||
is_async = CONFIG['Coze']['is_async'].lower() == 'true'
|
||||
|
||||
url = "https://api.coze.cn/v1/workflow/run"
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {access_token}",
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
data = {
|
||||
"workflow_id": workflow_id,
|
||||
"parameters": parameters,
|
||||
"is_async": is_async
|
||||
}
|
||||
|
||||
response = requests.post(url, json=data, headers=headers)
|
||||
|
||||
if response.status_code == 200:
|
||||
# data = json.loads(response.text)['data']
|
||||
# print("data:",data['output'])
|
||||
|
||||
return response.text
|
||||
else:
|
||||
return {
|
||||
"error": f"请求失败,状态码:{response.status_code}",
|
||||
"detail": response.text
|
||||
}
|
||||
|
||||
|
||||
def call_coze_article_workflow(parameters):
|
||||
"""
|
||||
调用 Coze 工作流的函数
|
||||
|
||||
:param parameters: 传递给工作流的输入参数(字典格式)
|
||||
:param is_async: 是否异步执行(默认 False)
|
||||
:return: 工作流的执行结果
|
||||
"""
|
||||
|
||||
workflow_id = CONFIG['Coze']['workflow_id']
|
||||
access_token = CONFIG['Coze']['access_token']
|
||||
is_async = CONFIG['Coze']['is_async'].lower() == 'true'
|
||||
url = "https://api.coze.cn/v1/workflow/run"
|
||||
headers = {
|
||||
"Authorization": f"Bearer {access_token}",
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
data = {
|
||||
"workflow_id": workflow_id,
|
||||
"parameters": parameters,
|
||||
"is_async": is_async
|
||||
}
|
||||
|
||||
response = requests.post(url, json=data, headers=headers)
|
||||
|
||||
if response.status_code == 200:
|
||||
# data = json.loads(response.text)['data']
|
||||
# print("data:",data['output'])
|
||||
import ast
|
||||
|
||||
# 直接解析整个result字符串
|
||||
result_dict = ast.literal_eval(response.text)
|
||||
|
||||
# 解析data字段
|
||||
data_dict = ast.literal_eval(result_dict['data'])
|
||||
|
||||
# 获取output的值
|
||||
output_value = data_dict['output']
|
||||
|
||||
return output_value
|
||||
else:
|
||||
return {
|
||||
"error": f"请求失败,状态码:{response.status_code}",
|
||||
"detail": response.text
|
||||
}
|
||||
|
||||
|
||||
def call_coze_all_article_workflow(parameters,is_async=False):
|
||||
"""
|
||||
调用 Coze 工作流的函数
|
||||
|
||||
:param parameters: 传递给工作流的输入参数(字典格式)
|
||||
:param is_async: 是否异步执行(默认 False)
|
||||
:return: 工作流的执行结果
|
||||
"""
|
||||
workflow_id = CONFIG['Coze']['workflow_id']
|
||||
access_token = CONFIG['Coze']['access_token']
|
||||
is_async = CONFIG['Coze']['is_async'].lower() == 'False'
|
||||
url = "https://api.coze.cn/v1/workflow/run"
|
||||
headers = {
|
||||
"Authorization": f"Bearer {access_token}",
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
data = {
|
||||
"workflow_id": workflow_id,
|
||||
"parameters": parameters,
|
||||
"is_async": is_async
|
||||
}
|
||||
|
||||
response = requests.post(url, json=data, headers=headers)
|
||||
|
||||
if response.status_code == 200:
|
||||
# data = json.loads(response.text)['data']
|
||||
# print("data:",data['output'])
|
||||
import ast
|
||||
|
||||
# 直接解析整个result字符串
|
||||
result_dict = ast.literal_eval(response.text)
|
||||
print(result_dict)
|
||||
|
||||
# 解析data字段
|
||||
data_dict = ast.literal_eval(result_dict['data'])
|
||||
|
||||
# 获取output的值
|
||||
title = data_dict['title']
|
||||
article = data_dict['article']
|
||||
return title, article
|
||||
else:
|
||||
return {
|
||||
"error": f"请求失败,状态码:{response.status_code}",
|
||||
"detail": response.text
|
||||
}
|
||||
31575
ArticleReplaceBatch/article_replace.log
Normal file
31575
ArticleReplaceBatch/article_replace.log
Normal file
File diff suppressed because one or more lines are too long
52
ArticleReplaceBatch/config.ini
Normal file
52
ArticleReplaceBatch/config.ini
Normal file
@ -0,0 +1,52 @@
|
||||
[General]
|
||||
chrome_user_dir = C:\Users\taiyi\AppData\Local\Google\Chrome\User Data
|
||||
articles_path = articles
|
||||
images_path = picture
|
||||
title_file = 文章链接.xlsx
|
||||
max_threads = 10
|
||||
|
||||
[Database]
|
||||
host = 27.106.125.150
|
||||
user = root
|
||||
password = taiyi.1224
|
||||
database = toutiao
|
||||
|
||||
[Dify]
|
||||
api_key = app-87gssUKFBs9BwJw4m95uUcyF
|
||||
user_id = toutiao
|
||||
url = http://27.106.125.150/v1/workflows/run
|
||||
input_data_template = {"old_article": "{article_text}"}
|
||||
|
||||
[Baidu]
|
||||
api_key = 6GvuZoSEe4L8I7O3p7tZRKhj
|
||||
secret_key = jDujU3MyzP34cUuTP0GNtPejlQpUFWvl
|
||||
|
||||
[ImageModify]
|
||||
crop_percent = 0.02
|
||||
min_rotation = 0.02
|
||||
max_rotation = 0.03
|
||||
min_brightness = 0.95
|
||||
max_brightness = 1.09
|
||||
watermark_text =
|
||||
watermark_opacity = 128
|
||||
overlay_opacity = 1
|
||||
|
||||
[Keywords]
|
||||
banned_words = 珠海,落马,股票,股市,股民,爆炸,火灾,死亡,抢劫,诈骗,习大大,习近平,政府,官员,扫黑,警察,落网,嫌疑人,通报,暴力执法,执法,暴力,气象,天气,暴雨,大雨
|
||||
|
||||
[Coze]
|
||||
workflow_id = 7509764025128845366
|
||||
access_token = pat_EwqZIrV7Y2DHadDWnqkk0k9YxDUxyjIEAfHZFAfnT97mUCexhoTl6McQq3O7mmI8
|
||||
is_async = true
|
||||
input_data_template = {"article": "{article_text}", "link":"{link}", "weijin":"{weijin}"}
|
||||
gen_type = 短篇
|
||||
templates = {"短篇": {"article": "{article_text}", "link":"{link}", "weijin":"{weijin}", "type":"短篇"}, "文章": {"article": "{article_text}", "link":"{link}", "weijin":"{weijin}", "type":"文章"}}
|
||||
short_templates = {"1": {"workflow_id": "750191704079481242511231123", "access_token": "pat_0DczPLquEPhA3mSqokHTPpU9KNHrM3mz5sZKSWxi7ZeWK1Fi5UjPzQihq1DwCQ91", "is_async": "false", "input_data_template": "{\"title\": \"{title_text}\"}"}}
|
||||
article_templates = {}
|
||||
last_used_template = 2. 万能
|
||||
last_used_template_type = 文章
|
||||
|
||||
[Templates]
|
||||
templates_短篇 = [{"name": "11", "type": "短篇", "workflow_id": "7501917040794812425", "access_token": "pat_0DczPLquEPhA3mSqokHTPpU9KNHrM3mz5sZKSWxi7ZeWK1Fi5UjPzQihq1DwCQ91", "is_async": "false", "input_data_template": "{\"title\": \"{title_text}\"}"}, {"name": "123", "workflow_id": "pat_0DczPLquEPhA3mSqokHTPpU9KNHrM3mz5sZKSWxi7ZeWK1Fi5UjPzQihq1DwCQ91", "access_token": "pat_0DczPLquEPhA3mSqokHTPpU9KNHrM3mz5sZKSWxi7ZeWK1Fi5UjPzQihq1DwCQ91", "is_async": "true", "input_data_template": "{\"article\": \"{article_text}\", \"link\":\"{link}\", \"weijin\":\"{weijin}\"}"}]
|
||||
templates_文章 = [{"name": "1. 情感", "workflow_id": "7520933385113141298", "access_token": "pat_e6f7xXY1Oi8fPxGnBuUV2ed4M8uEdZ4KL6Ncn4359cP5lL6ARCTJg5bVyE4YZxoL", "is_async": "true", "input_data_template": "{\"article\": \"{article_text}\", \"link\":\"{link}\", \"weijin\":\"{weijin}\"}"}, {"name": "2. 万能", "workflow_id": "7509764025128845366", "access_token": "pat_EwqZIrV7Y2DHadDWnqkk0k9YxDUxyjIEAfHZFAfnT97mUCexhoTl6McQq3O7mmI8", "is_async": "true", "input_data_template": "{\"article\": \"{article_text}\", \"link\":\"{link}\", \"weijin\":\"{weijin}\"}"}, {"name": "3.七天", "workflow_id": "752935212125560838344", "access_token": "pat_nFWBkrbtjdaQaqKsfmfrYxlhTkjni8QdqE23xPN3V2Bn4sbMQQz24pHuqJzEc3Tm", "is_async": "true"}]
|
||||
|
||||
121
ArticleReplaceBatch/config.py
Normal file
121
ArticleReplaceBatch/config.py
Normal file
@ -0,0 +1,121 @@
|
||||
import configparser
|
||||
import getpass
|
||||
import logging
|
||||
import os
|
||||
|
||||
# 配置文件路径
|
||||
CONFIG_FILE = "config.ini"
|
||||
|
||||
# 默认配置
|
||||
DEFAULT_CONFIG = {
|
||||
"General": {
|
||||
"chrome_user_dir": f"C:\\Users\\{getpass.getuser()}\\AppData\\Local\\Google\\Chrome\\User Data",
|
||||
"articles_path": "articles",
|
||||
"images_path": "picture",
|
||||
"title_file": "文章链接.xlsx",
|
||||
"max_threads": "3"
|
||||
},
|
||||
"Coze": {
|
||||
"workflow_id": "",
|
||||
"access_token": "",
|
||||
"is_async": "false",
|
||||
"input_data_template": "{\"article\": \"{article_text}\", \"link\":\"{link}\", \"weijin\":\"{weijin}\"}",
|
||||
"last_used_template": "",
|
||||
"last_used_template_type": "文章"
|
||||
},
|
||||
"Database": {
|
||||
"host": "27.106.125.150",
|
||||
"user": "root",
|
||||
"password": "taiyi.1224",
|
||||
"database": "toutiao"
|
||||
},
|
||||
"Dify": {
|
||||
"api_key": "app-87gssUKFBs9BwJw4m95uUcyF",
|
||||
"user_id": "toutiao",
|
||||
"url": "http://27.106.125.150/v1/workflows/run"
|
||||
},
|
||||
"Baidu": {
|
||||
"api_key": "6GvuZoSEe4L8I7O3p7tZRKhj",
|
||||
"secret_key": "jDujU3MyzP34cUuTP0GNtPejlQpUFWvl"
|
||||
},
|
||||
"ImageModify": {
|
||||
"crop_percent": "0.02",
|
||||
"min_rotation": "0.3",
|
||||
"max_rotation": "3.0",
|
||||
"min_brightness": "0.8",
|
||||
"max_brightness": "1.2",
|
||||
"watermark_text": "Qin Quan Shan Chu",
|
||||
"watermark_opacity": "128",
|
||||
"overlay_opacity": "30"
|
||||
},
|
||||
"Keywords": {
|
||||
"banned_words": "珠海,落马,股票,股市,股民,爆炸,火灾,死亡,抢劫,诈骗,习大大,习近平,政府,官员,扫黑,警察,落网,嫌疑人,通报,暴力执法,执法,暴力,气象,天气,暴雨,大雨"
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
# 加载配置
|
||||
def load_config():
|
||||
config = configparser.ConfigParser()
|
||||
|
||||
# 如果配置文件不存在,创建默认配置
|
||||
if not os.path.exists(CONFIG_FILE):
|
||||
for section, options in DEFAULT_CONFIG.items():
|
||||
config[section] = options
|
||||
|
||||
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
|
||||
config.write(f)
|
||||
else:
|
||||
config.read(CONFIG_FILE, encoding='utf-8')
|
||||
|
||||
# 检查并添加缺失的配置项
|
||||
for section, options in DEFAULT_CONFIG.items():
|
||||
if not config.has_section(section):
|
||||
config[section] = {}
|
||||
|
||||
for option, value in options.items():
|
||||
if not config.has_option(section, option):
|
||||
config[section][option] = value
|
||||
|
||||
# 保存更新后的配置
|
||||
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
|
||||
config.write(f)
|
||||
|
||||
return config
|
||||
|
||||
|
||||
# 保存配置
|
||||
def save_config(config):
|
||||
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
|
||||
config.write(f)
|
||||
|
||||
|
||||
# 加载配置
|
||||
CONFIG = load_config()
|
||||
|
||||
# 更新全局变量
|
||||
USER_DIR_PATH = CONFIG['General']['chrome_user_dir']
|
||||
ARTICLES_BASE_PATH = CONFIG['General']['articles_path']
|
||||
IMGS_BASE_PATH = CONFIG['General']['images_path']
|
||||
TITLE_BASE_PATH = CONFIG['General']['title_file']
|
||||
MAX_THREADS = int(CONFIG['General']['max_threads'])
|
||||
|
||||
# 创建必要的目录
|
||||
if not os.path.exists(ARTICLES_BASE_PATH):
|
||||
os.makedirs(ARTICLES_BASE_PATH)
|
||||
os.chmod(ARTICLES_BASE_PATH, 0o777)
|
||||
if not os.path.exists(IMGS_BASE_PATH):
|
||||
os.makedirs(IMGS_BASE_PATH)
|
||||
os.chmod(IMGS_BASE_PATH, 0o777)
|
||||
|
||||
# 日志配置
|
||||
logging.basicConfig(level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.FileHandler("article_replace.log", encoding='utf-8'),
|
||||
logging.StreamHandler()
|
||||
])
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# 日志文件保存路径
|
||||
LOG_FILE = "article_replace.log"
|
||||
86
ArticleReplaceBatch/databases.py
Normal file
86
ArticleReplaceBatch/databases.py
Normal file
@ -0,0 +1,86 @@
|
||||
import pymysql
|
||||
|
||||
|
||||
# ==============================数据库模块===================================
|
||||
def check_link_exists(host, user, password, database, link):
|
||||
"""
|
||||
检查指定的 link 是否存在于 MySQL 数据库表中,如果不存在,则插入该链接
|
||||
:param host: MySQL 数据库主机地址
|
||||
:param user: MySQL 用户名
|
||||
:param password: MySQL 密码
|
||||
:param database: 数据库名称
|
||||
:param link: 需要检查的链接
|
||||
:return: 如果链接存在,返回 True;如果链接不存在且插入成功,返回 False
|
||||
"""
|
||||
connection = None # 确保 connection 被初始化
|
||||
|
||||
try:
|
||||
# 连接到 MySQL 数据库
|
||||
connection = pymysql.connect(
|
||||
host=host,
|
||||
user=user,
|
||||
password=password,
|
||||
database=database
|
||||
)
|
||||
|
||||
with connection.cursor() as cursor:
|
||||
# 查询链接是否存在
|
||||
cursor.execute("SELECT 1 FROM links WHERE link = %s", (link,))
|
||||
result = cursor.fetchone()
|
||||
|
||||
# 如果链接存在
|
||||
if result:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
except pymysql.MySQLError as e:
|
||||
print(f"数据库错误: {e}")
|
||||
return False
|
||||
finally:
|
||||
# 确保在结束时关闭连接
|
||||
if connection:
|
||||
connection.close()
|
||||
|
||||
|
||||
def check_link_insert(host, user, password, database, link):
|
||||
"""
|
||||
检查指定的 link 是否存在于 MySQL 数据库表中,如果不存在,则插入该链接
|
||||
:param host: MySQL 数据库主机地址
|
||||
:param user: MySQL 用户名
|
||||
:param password: MySQL 密码
|
||||
:param database: 数据库名称
|
||||
:param link: 需要检查的链接
|
||||
:return: 如果链接存在,返回 True;如果链接不存在且插入成功,返回 False
|
||||
"""
|
||||
connection = None # 确保 connection 被初始化
|
||||
try:
|
||||
# 连接到 MySQL 数据库
|
||||
connection = pymysql.connect(
|
||||
host=host,
|
||||
user=user,
|
||||
password=password,
|
||||
database=database
|
||||
)
|
||||
|
||||
with connection.cursor() as cursor:
|
||||
# 查询链接是否存在
|
||||
cursor.execute("SELECT 1 FROM links WHERE link = %s", (link,))
|
||||
result = cursor.fetchone()
|
||||
if result:
|
||||
# 如果链接已经存在,返回 True
|
||||
return True
|
||||
else:
|
||||
# 插入链接
|
||||
cursor.execute("INSERT INTO links (link) VALUES (%s)", (link,))
|
||||
connection.commit() # 提交事务
|
||||
print("链接已插入")
|
||||
return False
|
||||
except pymysql.MySQLError as e:
|
||||
print(f"数据库错误: {e}")
|
||||
return False
|
||||
finally:
|
||||
# 确保在结束时关闭连接
|
||||
if connection:
|
||||
connection.close()
|
||||
|
||||
341
ArticleReplaceBatch/images_edit.py
Normal file
341
ArticleReplaceBatch/images_edit.py
Normal file
@ -0,0 +1,341 @@
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
|
||||
import requests
|
||||
from PIL import Image
|
||||
from PIL import ImageDraw, ImageFont, ImageEnhance
|
||||
|
||||
from config import *
|
||||
from utils import safe_open_directory, safe_filename
|
||||
|
||||
IMGS_BASE_PATH = CONFIG['General']['images_path']
|
||||
|
||||
|
||||
def crop_and_replace_images(folder_path):
|
||||
"""
|
||||
修改图片尺寸
|
||||
:param folder_path:
|
||||
:return:
|
||||
"""
|
||||
print("开始处理图片。。。。")
|
||||
# 遍历文件夹中的所有文件
|
||||
for filename in os.listdir(folder_path):
|
||||
# 检查文件扩展名是否为图片格式
|
||||
if filename.lower().endswith(('.jpg')):
|
||||
# 拼接完整的文件路径
|
||||
file_path = os.path.join(folder_path, filename)
|
||||
print("文件夹路径:" + folder_path)
|
||||
print("文件路径:" + file_path)
|
||||
# 打开图片
|
||||
with Image.open(file_path) as img:
|
||||
# 获取图片的尺寸
|
||||
width, height = img.size
|
||||
# 裁剪图片,裁剪下方10px
|
||||
print("裁剪图片。。。")
|
||||
cropped_img = img.crop((0, 0, width, height - (height * 0.1)))
|
||||
# 保存裁剪后的图片,覆盖原文件
|
||||
# 通过拉伸使改变裁剪后图片的尺寸与原图片尺寸相同
|
||||
resized_img = cropped_img.resize((width, height))
|
||||
# output_path = file_path[0:file_path.find('.')] + '.png'
|
||||
|
||||
resized_img.save(file_path, 'jpg')
|
||||
|
||||
|
||||
def deduplicate_images(folder_path):
|
||||
print("开始对图片去重。。。")
|
||||
"""扫描 folder_path 下的图片,对每张图片做修改并直接覆盖原文件"""
|
||||
if not os.path.exists(folder_path):
|
||||
print("错误:输入文件夹不存在!")
|
||||
return
|
||||
|
||||
supported_ext = ('.png', '.jpg', '.jpeg', '.bmp', '.gif', '.webp')
|
||||
|
||||
for root, _, files in os.walk(folder_path):
|
||||
for file in files:
|
||||
if file.lower().endswith(supported_ext):
|
||||
file_path = os.path.join(root, file)
|
||||
try:
|
||||
with Image.open(file_path) as img:
|
||||
modified_img = modify_image(img)
|
||||
modified_img.save(file_path) # 直接覆盖原图片
|
||||
print(f"已处理并覆盖:{file_path}")
|
||||
except Exception as e:
|
||||
print(f"处理 {file_path} 时出错:{e}")
|
||||
|
||||
|
||||
def download_image(image_url, save_path):
|
||||
"""
|
||||
下载图片并保存
|
||||
:param image_url: 图片链接
|
||||
:param save_path: 保存路径
|
||||
:return:
|
||||
"""
|
||||
try:
|
||||
response = requests.get(image_url)
|
||||
if response.status_code == 200:
|
||||
with open(save_path, 'wb') as f:
|
||||
f.write(response.content)
|
||||
print(f"图片下载成功,保存路径为:{save_path}")
|
||||
else:
|
||||
print(f"图片下载失败,状态码为:{response.status_code}")
|
||||
except requests.exceptions.RequestException as e:
|
||||
print(f"请求出错:{e}")
|
||||
|
||||
|
||||
def download_and_process_images(img_urls, article_title, save_dir=None):
|
||||
"""
|
||||
下载并处理图片
|
||||
:param img_urls: 图片URL列表
|
||||
:param article_title: 文章标题
|
||||
:param save_dir: 自定义保存目录,如果为None则使用默认目录
|
||||
"""
|
||||
if save_dir is None:
|
||||
save_dir = IMGS_BASE_PATH
|
||||
|
||||
# 使用safe_filename处理文章标题
|
||||
safe_title = safe_filename(article_title)
|
||||
# 使用os.path.normpath来规范化路径,避免路径分隔符的问题
|
||||
img_dir_path = os.path.normpath(os.path.join(str(save_dir), safe_title))
|
||||
logger.info(f"图片保存路径:{img_dir_path}")
|
||||
safe_open_directory(img_dir_path)
|
||||
|
||||
for i, img_url in enumerate(img_urls):
|
||||
if img_url.startswith("https"):
|
||||
imgurl = img_url
|
||||
else:
|
||||
imgurl = "https:" + img_url
|
||||
# 使用os.path.normpath来规范化图片路径
|
||||
img_path = os.path.normpath(os.path.join(img_dir_path, f"图片{i}.jpg"))
|
||||
try:
|
||||
download_image(imgurl, img_path)
|
||||
# 只处理当前下载的图片,而不是整个文件夹
|
||||
with Image.open(img_path) as img:
|
||||
modified_img = modify_image(img)
|
||||
modified_img.save(img_path) # 直接覆盖原图片
|
||||
print(f"已处理并覆盖:{img_path}")
|
||||
except Exception as e:
|
||||
logging.error(f"处理图片失败: {e}")
|
||||
|
||||
# def download_and_process_images(img_urls, article_title, save_dir=None):
|
||||
# """
|
||||
# 下载并处理图片
|
||||
# :param img_urls: 图片URL列表
|
||||
# :param article_title: 文章标题
|
||||
# :param save_dir: 自定义保存目录,如果为None则使用默认目录
|
||||
# """
|
||||
# if save_dir is None:
|
||||
# save_dir = IMGS_BASE_PATH
|
||||
#
|
||||
# img_dir_path = os.path.join(str(save_dir), str(article_title))
|
||||
# logger.info(f"图片保存路径:{img_dir_path}")
|
||||
# safe_open_directory(img_dir_path)
|
||||
#
|
||||
# for i, img_url in enumerate(img_urls):
|
||||
# if img_url.startswith("https"):
|
||||
# imgurl = img_url
|
||||
# else:
|
||||
# imgurl = "https:"+img_url
|
||||
# img_path = os.path.join(img_dir_path, f"图片{i}.jpg")
|
||||
# try:
|
||||
# download_image(imgurl, img_path)
|
||||
# # crop_and_replace_images(img_dir_path)
|
||||
# deduplicate_images(img_dir_path)
|
||||
# except Exception as e:
|
||||
# logging.error(f"处理图片失败: {e}")
|
||||
|
||||
|
||||
# def modify_image(img):
|
||||
# print("修改图片")
|
||||
# """对图片应用去重处理,不翻转,仅裁剪、旋转、亮度调整、添加水印、加透明蒙版"""
|
||||
# width, height = img.size
|
||||
#
|
||||
# # 从配置中获取参数
|
||||
# crop_percent = float(CONFIG['ImageModify']['crop_percent'])
|
||||
# min_rotation = float(CONFIG['ImageModify']['min_rotation'])
|
||||
# max_rotation = float(CONFIG['ImageModify']['max_rotation'])
|
||||
# min_brightness = float(CONFIG['ImageModify']['min_brightness'])
|
||||
# max_brightness = float(CONFIG['ImageModify']['max_brightness'])
|
||||
# watermark_text = CONFIG['ImageModify']['watermark_text']
|
||||
# watermark_opacity = int(CONFIG['ImageModify']['watermark_opacity'])
|
||||
# overlay_opacity = int(CONFIG['ImageModify']['overlay_opacity'])
|
||||
#
|
||||
# # 1. 裁剪边缘
|
||||
# crop_px_w = int(width * crop_percent)
|
||||
# crop_px_h = int(height * crop_percent)
|
||||
# img = img.crop((crop_px_w, crop_px_h, width - crop_px_w, height - crop_px_h))
|
||||
#
|
||||
# # 2. 随机旋转
|
||||
# angle = random.uniform(min_rotation, max_rotation) * random.choice([-1, 1])
|
||||
# img = img.rotate(angle, expand=True)
|
||||
#
|
||||
# # 3. 调整亮度
|
||||
# enhancer = ImageEnhance.Brightness(img)
|
||||
# factor = random.uniform(min_brightness, max_brightness) # 亮度调整因子
|
||||
# img = enhancer.enhance(factor)
|
||||
#
|
||||
# # 4. 添加文字水印
|
||||
# draw = ImageDraw.Draw(img)
|
||||
# font_size = max(20, int(min(img.size) * 0.05))
|
||||
# try:
|
||||
# font = ImageFont.truetype("arial.ttf", font_size)
|
||||
# except:
|
||||
# font = ImageFont.load_default()
|
||||
#
|
||||
# # 获取文本尺寸
|
||||
# text_width, text_height = draw.textbbox((0, 0), watermark_text, font=font)[2:]
|
||||
#
|
||||
# # 水印放在图片右下角
|
||||
# x = img.size[0] - text_width - 5
|
||||
# y = img.size[1] - text_height - 5
|
||||
# draw.text((x, y), watermark_text, font=font, fill=(255, 255, 255, watermark_opacity))
|
||||
#
|
||||
# # 5. 添加半透明蒙版
|
||||
# overlay = Image.new('RGBA', img.size, (255, 255, 255, overlay_opacity))
|
||||
# if img.mode != 'RGBA':
|
||||
# img = img.convert('RGBA')
|
||||
# img = Image.alpha_composite(img, overlay)
|
||||
#
|
||||
# return img.convert('RGB')
|
||||
|
||||
|
||||
def modify_image(img):
|
||||
"""
|
||||
对图片应用去重处理,不翻转,仅裁剪、旋转、亮度调整、添加水印、加透明蒙版
|
||||
参数:
|
||||
img: PIL.Image对象,要处理的图片
|
||||
返回:
|
||||
PIL.Image对象,处理后的图片
|
||||
"""
|
||||
print("修改图片")
|
||||
# 确保图片是RGB模式
|
||||
if img.mode != 'RGB':
|
||||
img = img.convert('RGB')
|
||||
# 从配置中获取参数
|
||||
config = CONFIG['ImageModify']
|
||||
crop_percent = float(config['crop_percent'])
|
||||
min_rotation = float(config['min_rotation'])
|
||||
max_rotation = float(config['max_rotation'])
|
||||
min_brightness = float(config['min_brightness'])
|
||||
max_brightness = float(config['max_brightness'])
|
||||
watermark_text = config['watermark_text']
|
||||
watermark_opacity = int(config['watermark_opacity'])
|
||||
overlay_opacity = int(config['overlay_opacity'])
|
||||
# 1. 新增功能:裁剪图片下方20px
|
||||
img = crop_bottom(img, 20)
|
||||
# 2. 裁剪边缘
|
||||
img = crop_edges(img, crop_percent)
|
||||
# 3. 随机旋转
|
||||
img = random_rotate(img, min_rotation, max_rotation)
|
||||
# 4. 调整亮度
|
||||
img = adjust_brightness(img, min_brightness, max_brightness)
|
||||
# 5. 添加文字水印
|
||||
img = add_watermark(img, watermark_text, watermark_opacity)
|
||||
# 6. 添加半透明蒙版
|
||||
img = add_overlay(img, overlay_opacity)
|
||||
# 返回RGB模式的图片
|
||||
return img.convert('RGB')
|
||||
|
||||
|
||||
def crop_bottom(img, pixels):
|
||||
"""
|
||||
裁剪图片底部指定像素
|
||||
参数:
|
||||
img: PIL.Image对象,要裁剪的图片
|
||||
pixels: int,要裁剪的像素数
|
||||
返回:
|
||||
PIL.Image对象,裁剪后的图片
|
||||
"""
|
||||
width, height = img.size
|
||||
if height > pixels: # 确保图片高度大于要裁剪的像素
|
||||
return img.crop((0, 0, width, height - pixels))
|
||||
return img
|
||||
|
||||
|
||||
def crop_edges(img, percent):
|
||||
"""
|
||||
按比例裁剪图片边缘
|
||||
参数:
|
||||
img: PIL.Image对象,要裁剪的图片
|
||||
percent: float,裁剪比例(0-1之间)
|
||||
返回:
|
||||
PIL.Image对象,裁剪后的图片
|
||||
"""
|
||||
width, height = img.size
|
||||
crop_px_w = int(width * percent)
|
||||
crop_px_h = int(height * percent)
|
||||
return img.crop((crop_px_w, crop_px_h, width - crop_px_w, height - crop_px_h))
|
||||
|
||||
|
||||
def random_rotate(img, min_rotation, max_rotation):
|
||||
"""
|
||||
随机旋转图片
|
||||
参数:
|
||||
img: PIL.Image对象,要旋转的图片
|
||||
min_rotation: float,最小旋转角度
|
||||
max_rotation: float,最大旋转角度
|
||||
返回:
|
||||
PIL.Image对象,旋转后的图片
|
||||
"""
|
||||
angle = random.uniform(min_rotation, max_rotation) * random.choice([-1, 1])
|
||||
return img.rotate(angle, expand=True)
|
||||
|
||||
|
||||
def adjust_brightness(img, min_brightness, max_brightness):
|
||||
"""
|
||||
调整图片亮度
|
||||
参数:
|
||||
img: PIL.Image对象,要调整亮度的图片
|
||||
min_brightness: float,最小亮度因子
|
||||
max_brightness: float,最大亮度因子
|
||||
返回:
|
||||
PIL.Image对象,调整亮度后的图片
|
||||
"""
|
||||
enhancer = ImageEnhance.Brightness(img)
|
||||
factor = random.uniform(min_brightness, max_brightness)
|
||||
return enhancer.enhance(factor)
|
||||
|
||||
|
||||
def add_watermark(img, text, opacity):
|
||||
"""
|
||||
添加文字水印到图片右下角
|
||||
参数:
|
||||
img: PIL.Image对象,要添加水印的图片
|
||||
text: str,水印文本
|
||||
opacity: int,水印透明度(0-255)
|
||||
返回:
|
||||
PIL.Image对象,添加水印后的图片
|
||||
"""
|
||||
# 确保图片是RGBA模式以支持透明度
|
||||
if img.mode != 'RGBA':
|
||||
img = img.convert('RGBA')
|
||||
draw = ImageDraw.Draw(img)
|
||||
font_size = max(20, int(min(img.size) * 0.05))
|
||||
try:
|
||||
font = ImageFont.truetype("arial.ttf", font_size)
|
||||
except:
|
||||
font = ImageFont.load_default()
|
||||
# 获取文本尺寸
|
||||
text_width, text_height = draw.textbbox((0, 0), text, font=font)[2:]
|
||||
# 确保水印不超出图片边界
|
||||
x = max(5, img.size[0] - text_width - 5)
|
||||
y = max(5, img.size[1] - text_height - 5)
|
||||
# 添加水印
|
||||
draw.text((x, y), text, font=font, fill=(255, 255, 255, opacity))
|
||||
return img
|
||||
|
||||
|
||||
def add_overlay(img, opacity):
|
||||
"""
|
||||
添加半透明蒙版
|
||||
参数:
|
||||
img: PIL.Image对象,要添加蒙版的图片
|
||||
opacity: int,蒙版透明度(0-255)
|
||||
返回:
|
||||
PIL.Image对象,添加蒙版后的图片
|
||||
"""
|
||||
# 确保图片是RGBA模式以支持透明度
|
||||
if img.mode != 'RGBA':
|
||||
img = img.convert('RGBA')
|
||||
overlay = Image.new('RGBA', img.size, (255, 255, 255, opacity))
|
||||
return Image.alpha_composite(img, overlay)
|
||||
253
ArticleReplaceBatch/main_process_wtt.py
Normal file
253
ArticleReplaceBatch/main_process_wtt.py
Normal file
@ -0,0 +1,253 @@
|
||||
import threading
|
||||
import queue
|
||||
import json # 导入 json 模块
|
||||
|
||||
from ai_studio import call_dify_workflow,call_coze_article_workflow,call_coze_all_article_workflow
|
||||
from databases import *
|
||||
|
||||
from images_edit import download_and_process_images
|
||||
from utils import *
|
||||
from get_web_content import *
|
||||
from config import *
|
||||
|
||||
|
||||
# ==============================主程序===========================
|
||||
def process_link(link_info, ai_service, current_template=None,generation_type=None):
|
||||
link, article_type = link_info # 解包链接和类型信息
|
||||
try:
|
||||
if link.startswith("https://www.toutiao.com"):
|
||||
title_text, article_text, img_urls = toutiao_w_extract_content(link)
|
||||
if title_text == "":
|
||||
title_text, article_text, img_urls = toutiao_extract_content(link)
|
||||
elif link.startswith("https://mp.weixin.qq.co"):
|
||||
title_text, article_text, img_urls = wechat_extract_content(link)
|
||||
elif link.startswith("https://www.163.com"):
|
||||
title_text, article_text, img_urls = wangyi_extract_content(link)
|
||||
else:
|
||||
title_text, article_text, img_urls = "", "", []
|
||||
|
||||
if title_text == "":
|
||||
return
|
||||
elif len(title_text) > 100:
|
||||
return
|
||||
|
||||
# 获取数据库配置
|
||||
host = CONFIG['Database']['host']
|
||||
user = CONFIG['Database']['user']
|
||||
password = CONFIG['Database']['password']
|
||||
database = CONFIG['Database']['database']
|
||||
|
||||
# 判断文章内容是否有违禁词
|
||||
check_keywords = check_keywords_in_text(title_text)
|
||||
|
||||
title = extract_content_until_punctuation(article_text).replace("正文:", "")
|
||||
|
||||
from datetime import datetime
|
||||
# 获取当前时间并格式化
|
||||
current_time = datetime.now().strftime("%H:%M:%S")
|
||||
# 打印当前时间
|
||||
print("当前时间:", current_time)
|
||||
|
||||
if ai_service == "dify":
|
||||
if check_keywords:
|
||||
print("文章中有违禁词!")
|
||||
check_link_insert(host, user, password, database, link)
|
||||
return
|
||||
input_data_template_str = CONFIG['Dify'].get('input_data_template', '{"old_article": "{article_text}"}')
|
||||
try:
|
||||
input_data_template = json.loads(input_data_template_str)
|
||||
input_data = {k: v.format(article_text=article_text) for k, v in input_data_template.items()}
|
||||
except (json.JSONDecodeError, KeyError, AttributeError) as e:
|
||||
logger.error(f"处理 Dify input_data 模板时出错: {e}. 使用默认模板.")
|
||||
input_data = {"old_article": article_text}
|
||||
message_content = call_dify_workflow(input_data)
|
||||
|
||||
elif ai_service == "coze":
|
||||
logger.info("coze正在处理")
|
||||
logger.info(f"正在处理的文章类型为:{generation_type}")
|
||||
if current_template:
|
||||
original_config = {
|
||||
'workflow_id': CONFIG['Coze']['workflow_id'],
|
||||
'access_token': CONFIG['Coze']['access_token'],
|
||||
'is_async': CONFIG['Coze']['is_async']
|
||||
}
|
||||
|
||||
CONFIG['Coze']['workflow_id'] = current_template.get('workflow_id', '')
|
||||
CONFIG['Coze']['access_token'] = current_template.get('access_token', '')
|
||||
CONFIG['Coze']['is_async'] = current_template.get('is_async', 'true')
|
||||
|
||||
logger.info(f"应用模板配置: {current_template.get('name')}")
|
||||
logger.info(f"Workflow ID: {CONFIG['Coze']['workflow_id']}")
|
||||
logger.info(f"Access Token: {'*' * len(CONFIG['Coze']['access_token'])}")
|
||||
logger.info(f"Is Async: {CONFIG['Coze']['is_async']}")
|
||||
|
||||
try:
|
||||
input_data_template_str = CONFIG['Coze'].get('input_data_template')
|
||||
input_data_template = json.loads(input_data_template_str)
|
||||
|
||||
if generation_type == "短篇":
|
||||
input_data = {"article": article_text}
|
||||
print("coze中输入:", input_data)
|
||||
message_content = call_coze_article_workflow(input_data)
|
||||
elif generation_type == "文章":
|
||||
print("原文中标题为:", title_text)
|
||||
print("原文中内容为:", article_text)
|
||||
input_data = {"title": title_text, "article": article_text}
|
||||
print("发送的请求数据为:", input_data)
|
||||
title, message_content = call_coze_all_article_workflow(input_data)
|
||||
finally:
|
||||
if 'original_config' in locals():
|
||||
CONFIG['Coze'].update(original_config)
|
||||
|
||||
# 去除标题首尾的空格
|
||||
title_text = title_text.strip()
|
||||
|
||||
# 创建类型目录
|
||||
type_dir = os.path.join(ARTICLES_BASE_PATH, article_type)
|
||||
safe_open_directory(type_dir)
|
||||
|
||||
# 在类型目录下保存文章
|
||||
file_name = ""
|
||||
if generation_type == '短篇':
|
||||
file_name = handle_duplicate_files_advanced(type_dir, title_text.strip())[0]
|
||||
elif generation_type == "文章":
|
||||
file_name = handle_duplicate_files_advanced(type_dir, title.strip())[0]
|
||||
|
||||
article_save_path = os.path.join(type_dir, f"{file_name}.txt")
|
||||
|
||||
# 判断文章合规度
|
||||
if text_detection(message_content) == "合规":
|
||||
print("文章合规")
|
||||
pass
|
||||
else:
|
||||
print("文章不合规")
|
||||
return
|
||||
|
||||
with open(article_save_path, 'w', encoding='utf-8') as f:
|
||||
f.write(message_content)
|
||||
logging.info('文本已经保存')
|
||||
|
||||
if img_urls:
|
||||
# 在类型目录下创建图片目录
|
||||
type_picture_dir = os.path.join(IMGS_BASE_PATH, article_type)
|
||||
safe_open_directory(type_picture_dir)
|
||||
# 确保文件名没有多余空格
|
||||
download_and_process_images(img_urls, file_name.strip(), type_picture_dir)
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"处理链接 {link} 时出错: {e}")
|
||||
raise
|
||||
|
||||
|
||||
def link_to_text(num_threads=None, ai_service="dify", current_template=None, generation_type=None):
|
||||
use_link_path = 'use_link_path.txt'
|
||||
|
||||
# 读取链接
|
||||
links = read_excel(TITLE_BASE_PATH)
|
||||
|
||||
# 过滤已处理的链接
|
||||
filtered_links = []
|
||||
host = CONFIG['Database']['host']
|
||||
user = CONFIG['Database']['user']
|
||||
password = CONFIG['Database']['password']
|
||||
database = CONFIG['Database']['database']
|
||||
|
||||
for link_info in links:
|
||||
link = link_info[0].strip() # 获取链接并去除空白字符
|
||||
# 如果Excel中有类型,使用Excel中的类型,否则使用传入的generation_type
|
||||
article_type = link_info[1].strip() if len(link_info) > 1 and link_info[1].strip() else generation_type
|
||||
logging.info(f"总共{len(links)}个链接")
|
||||
# if check_link_exists(host, user, password, database, link):
|
||||
# logger.info(f"链接已存在: {link}")
|
||||
# continue
|
||||
# else:
|
||||
filtered_links.append((link, article_type)) # 保存链接和类型的元组
|
||||
# logger.info(f"链接不存在: {link}")
|
||||
# print("链接不存在,存储到过滤器中:", link)
|
||||
|
||||
if not filtered_links:
|
||||
logger.info("没有新链接需要处理")
|
||||
return []
|
||||
|
||||
# 使用多线程处理链接
|
||||
results = process_links_with_threads(filtered_links, num_threads, ai_service, current_template,generation_type)
|
||||
|
||||
# 记录已处理的链接
|
||||
with open(use_link_path, 'a+', encoding='utf-8') as f:
|
||||
for link, success, _ in results:
|
||||
if success:
|
||||
f.write(link + "\n")
|
||||
|
||||
return results
|
||||
|
||||
|
||||
# 创建一个任务队列和结果队列
|
||||
task_queue = queue.Queue()
|
||||
result_queue = queue.Queue()
|
||||
|
||||
|
||||
# 工作线程函数
|
||||
def worker(ai_service, current_template=None,generation_type=None):
|
||||
while True:
|
||||
try:
|
||||
# 从队列中获取任务
|
||||
link = task_queue.get()
|
||||
if link is None: # 结束信号
|
||||
break
|
||||
|
||||
# 处理链接
|
||||
try:
|
||||
logger.info(f"开始处理链接:{link}")
|
||||
process_link(link, ai_service, current_template,generation_type)
|
||||
result_queue.put((link, True, None)) # 成功
|
||||
except Exception as e:
|
||||
result_queue.put((link, False, str(e))) # 失败
|
||||
logger.error(f"处理链接 {link} 时出错: {e}")
|
||||
|
||||
# 标记任务完成
|
||||
task_queue.task_done()
|
||||
except Exception as e:
|
||||
logger.error(f"工作线程出错: {e}")
|
||||
|
||||
|
||||
# 多线程处理链接
|
||||
def process_links_with_threads(links, num_threads=None, ai_service="dify", current_template=None,generation_type=None):
|
||||
if num_threads is None:
|
||||
num_threads = min(MAX_THREADS, len(links))
|
||||
else:
|
||||
num_threads = min(num_threads, MAX_THREADS, len(links))
|
||||
|
||||
# 清空任务队列和结果队列
|
||||
while not task_queue.empty():
|
||||
task_queue.get()
|
||||
while not result_queue.empty():
|
||||
result_queue.get()
|
||||
|
||||
# 创建工作线程
|
||||
threads = []
|
||||
|
||||
# 将AI服务选择和模板配置传递给worker函数
|
||||
for _ in range(num_threads):
|
||||
t = threading.Thread(target=worker, args=(ai_service, current_template,generation_type))
|
||||
t.daemon = True
|
||||
t.start()
|
||||
threads.append(t)
|
||||
|
||||
# 添加任务到队列
|
||||
for link in links:
|
||||
task_queue.put(link)
|
||||
|
||||
# 添加结束信号
|
||||
for _ in range(num_threads):
|
||||
task_queue.put(None)
|
||||
|
||||
# 等待所有线程完成
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
# 处理结果
|
||||
results = []
|
||||
while not result_queue.empty():
|
||||
results.append(result_queue.get())
|
||||
|
||||
return results
|
||||
BIN
ArticleReplaceBatch/output.docx
Normal file
BIN
ArticleReplaceBatch/output.docx
Normal file
Binary file not shown.
433
ArticleReplaceBatch/replacestr.py
Normal file
433
ArticleReplaceBatch/replacestr.py
Normal file
@ -0,0 +1,433 @@
|
||||
import re
|
||||
import random
|
||||
import argparse
|
||||
import sys
|
||||
import os
|
||||
from typing import List, Tuple, Optional, Dict, Any
|
||||
from pathlib import Path
|
||||
import logging
|
||||
|
||||
|
||||
class TextProcessor:
|
||||
"""文本处理器类,支持句子拆分和字符交换"""
|
||||
|
||||
def __init__(self, min_length: int = 30, custom_punctuation: Optional[str] = None):
|
||||
"""
|
||||
初始化文本处理器
|
||||
|
||||
Args:
|
||||
min_length: 句子长度阈值
|
||||
custom_punctuation: 自定义标点符号,如果为None则使用默认标点
|
||||
"""
|
||||
self.min_length = min_length
|
||||
self.sentence_endings = custom_punctuation or r'[,!?;?!;]'
|
||||
self.statistics = {
|
||||
'total_sentences': 0,
|
||||
'processed_sentences': 0,
|
||||
'total_chars': 0,
|
||||
'swapped_chars': 0
|
||||
}
|
||||
|
||||
# 设置日志
|
||||
logging.basicConfig(level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
||||
def split_sentences(self, text: str) -> List[Tuple[str, str]]:
|
||||
"""
|
||||
按标点符号拆分句子,保留标点符号
|
||||
|
||||
Args:
|
||||
text: 输入文本
|
||||
|
||||
Returns:
|
||||
List[Tuple[str, str]]: 每个元组包含 (句子内容, 标点符号)
|
||||
"""
|
||||
if not text.strip():
|
||||
return []
|
||||
|
||||
# 使用正则表达式拆分,保留分隔符
|
||||
parts = re.split(f'({self.sentence_endings})', text)
|
||||
|
||||
sentences = []
|
||||
i = 0
|
||||
while i < len(parts):
|
||||
content = parts[i].strip()
|
||||
if content: # 非空内容
|
||||
# 检查下一个部分是否是标点符号
|
||||
if i + 1 < len(parts) and re.match(self.sentence_endings, parts[i + 1]):
|
||||
punctuation = parts[i + 1]
|
||||
i += 2
|
||||
else:
|
||||
punctuation = ''
|
||||
i += 1
|
||||
sentences.append((content, punctuation))
|
||||
self.statistics['total_sentences'] += 1
|
||||
else:
|
||||
i += 1
|
||||
|
||||
return sentences
|
||||
|
||||
def swap_random_chars(self, sentence: str) -> str:
|
||||
"""
|
||||
对超长句子随机交换相邻两个字符的顺序
|
||||
|
||||
Args:
|
||||
sentence: 输入句子
|
||||
|
||||
Returns:
|
||||
str: 处理后的句子
|
||||
"""
|
||||
# 边界情况处理
|
||||
if not sentence or len(sentence) <= self.min_length or len(sentence) <= 3:
|
||||
return sentence
|
||||
|
||||
# 转换为字符列表便于操作
|
||||
chars = list(sentence)
|
||||
original_length = len(chars)
|
||||
|
||||
# 确定可交换的范围(避开首尾字符,且需要成对相邻)
|
||||
# 对于长度为n的句子,可交换的相邻对位置为:(1,2), (2,3), ..., (n-3,n-2)
|
||||
start_idx = 1
|
||||
end_idx = len(chars) - 3 # 最后一个可交换对的起始位置
|
||||
|
||||
if end_idx < start_idx:
|
||||
return sentence
|
||||
|
||||
try:
|
||||
# 随机选择一个相邻对的起始位置
|
||||
swap_start = random.randint(start_idx, end_idx)
|
||||
swap_end = swap_start + 1
|
||||
|
||||
# 交换相邻的两个字符
|
||||
chars[swap_start], chars[swap_end] = chars[swap_end], chars[swap_start]
|
||||
|
||||
# 更新统计信息
|
||||
self.statistics['processed_sentences'] += 1
|
||||
self.statistics['swapped_chars'] += 2
|
||||
|
||||
self.logger.debug(f"交换相邻位置 {swap_start} 和 {swap_end},句子长度:{original_length}")
|
||||
|
||||
except (ValueError, IndexError) as e:
|
||||
self.logger.warning(f"字符交换失败:{e}")
|
||||
return sentence
|
||||
|
||||
return ''.join(chars)
|
||||
|
||||
def process_text(self, text: str) -> str:
|
||||
"""
|
||||
处理文本:拆分句子并对超长句子进行字符交换
|
||||
|
||||
Args:
|
||||
text: 输入文本
|
||||
|
||||
Returns:
|
||||
str: 处理后的文本
|
||||
"""
|
||||
if not text:
|
||||
return text
|
||||
|
||||
# 重置统计信息
|
||||
self.statistics = {
|
||||
'total_sentences': 0,
|
||||
'processed_sentences': 0,
|
||||
'total_chars': len(text),
|
||||
'swapped_chars': 0
|
||||
}
|
||||
|
||||
# 按段落分割
|
||||
paragraphs = text.split('\n')
|
||||
processed_paragraphs = []
|
||||
|
||||
for paragraph in paragraphs:
|
||||
if not paragraph.strip():
|
||||
processed_paragraphs.append(paragraph)
|
||||
continue
|
||||
|
||||
# 拆分句子
|
||||
sentences = self.split_sentences(paragraph)
|
||||
|
||||
# 处理每个句子
|
||||
processed_sentences = []
|
||||
for sentence_content, punctuation in sentences:
|
||||
# 对句子内容进行字符交换
|
||||
processed_content = self.swap_random_chars(sentence_content)
|
||||
processed_sentences.append(processed_content + punctuation)
|
||||
|
||||
# 重新组合句子
|
||||
processed_paragraph = ''.join(processed_sentences)
|
||||
processed_paragraphs.append(processed_paragraph)
|
||||
|
||||
return '\n'.join(processed_paragraphs)
|
||||
|
||||
def get_statistics(self) -> Dict[str, Any]:
|
||||
"""获取处理统计信息"""
|
||||
return self.statistics.copy()
|
||||
|
||||
def print_statistics(self):
|
||||
"""打印处理统计信息"""
|
||||
stats = self.get_statistics()
|
||||
print("\n" + "=" * 50)
|
||||
print("处理统计信息:")
|
||||
print(f"总字符数:{stats['total_chars']}")
|
||||
print(f"总句子数:{stats['total_sentences']}")
|
||||
print(f"处理句子数:{stats['processed_sentences']}")
|
||||
print(f"交换字符数:{stats['swapped_chars']}")
|
||||
if stats['total_sentences'] > 0:
|
||||
print(f"处理率:{stats['processed_sentences'] / stats['total_sentences'] * 100:.1f}%")
|
||||
print("=" * 50)
|
||||
|
||||
|
||||
class FileHandler:
|
||||
"""文件处理器,负责文件的读写操作"""
|
||||
|
||||
@staticmethod
|
||||
def read_file(filename: str) -> str:
|
||||
"""
|
||||
读取文件内容,支持多种编码
|
||||
|
||||
Args:
|
||||
filename: 文件路径
|
||||
|
||||
Returns:
|
||||
str: 文件内容
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: 文件不存在
|
||||
PermissionError: 权限不足
|
||||
UnicodeDecodeError: 编码错误
|
||||
"""
|
||||
if not os.path.exists(filename):
|
||||
raise FileNotFoundError(f"文件 '{filename}' 不存在")
|
||||
|
||||
if not os.access(filename, os.R_OK):
|
||||
raise PermissionError(f"没有读取文件 '{filename}' 的权限")
|
||||
|
||||
# 尝试多种编码格式
|
||||
encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1']
|
||||
|
||||
for encoding in encodings:
|
||||
try:
|
||||
with open(filename, 'r', encoding=encoding) as f:
|
||||
content = f.read()
|
||||
logging.info(f"使用 {encoding} 编码成功读取文件:{filename}")
|
||||
return content
|
||||
except UnicodeDecodeError:
|
||||
continue
|
||||
|
||||
raise UnicodeDecodeError(f"无法解码文件 '{filename}',尝试的编码格式:{encodings}")
|
||||
|
||||
@staticmethod
|
||||
def write_file(filename: str, content: str, encoding: str = 'utf-8') -> None:
|
||||
"""
|
||||
写入文件内容
|
||||
|
||||
Args:
|
||||
filename: 输出文件路径
|
||||
content: 要写入的内容
|
||||
encoding: 编码格式
|
||||
|
||||
Raises:
|
||||
PermissionError: 权限不足
|
||||
OSError: 磁盘空间不足等系统错误
|
||||
"""
|
||||
# 确保目录存在
|
||||
output_dir = os.path.dirname(filename)
|
||||
if output_dir and not os.path.exists(output_dir):
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
try:
|
||||
with open(filename, 'w', encoding=encoding) as f:
|
||||
f.write(content)
|
||||
logging.info(f"成功写入文件:{filename}")
|
||||
except PermissionError:
|
||||
raise PermissionError(f"没有写入文件 '{filename}' 的权限")
|
||||
except OSError as e:
|
||||
raise OSError(f"写入文件 '{filename}' 时发生错误:{e}")
|
||||
|
||||
|
||||
def setup_argument_parser() -> argparse.ArgumentParser:
|
||||
"""设置命令行参数解析器"""
|
||||
parser = argparse.ArgumentParser(
|
||||
description='文本句子字符交换处理器',
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog="""
|
||||
使用示例:
|
||||
%(prog)s -f input.txt # 处理文件
|
||||
%(prog)s -t "你的文本内容" # 直接处理文本
|
||||
%(prog)s -f input.txt -l 20 # 设置长度阈值为20
|
||||
%(prog)s -f input.txt -o output.txt # 输出到文件
|
||||
%(prog)s -f input.txt -p "。!?" -s # 自定义标点符号并显示统计
|
||||
"""
|
||||
)
|
||||
|
||||
# 输入选项
|
||||
input_group = parser.add_mutually_exclusive_group(required=True)
|
||||
input_group.add_argument('-f', '--file', help='输入文件路径')
|
||||
input_group.add_argument('-t', '--text', help='直接输入文本')
|
||||
input_group.add_argument('--stdin', action='store_true',
|
||||
help='从标准输入读取文本')
|
||||
|
||||
# 处理选项
|
||||
parser.add_argument('-l', '--length', type=int, default=30,
|
||||
help='句子长度阈值(默认30)')
|
||||
parser.add_argument('-p', '--punctuation',
|
||||
help='自定义标点符号(默认:。!?;?!;)')
|
||||
parser.add_argument('-o', '--output', help='输出文件路径')
|
||||
parser.add_argument('-e', '--encoding', default='utf-8',
|
||||
help='输出文件编码(默认utf-8)')
|
||||
|
||||
# 其他选项
|
||||
parser.add_argument('-s', '--statistics', action='store_true',
|
||||
help='显示处理统计信息')
|
||||
parser.add_argument('-v', '--verbose', action='store_true',
|
||||
help='显示详细日志')
|
||||
parser.add_argument('--seed', type=int, help='随机数种子(用于测试)')
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
def main():
|
||||
"""主函数:处理命令行参数和文本处理"""
|
||||
parser = setup_argument_parser()
|
||||
args = parser.parse_args()
|
||||
|
||||
# 设置日志级别
|
||||
if args.verbose:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
# 设置随机数种子(用于测试)
|
||||
if args.seed:
|
||||
random.seed(args.seed)
|
||||
|
||||
# 获取输入文本
|
||||
try:
|
||||
if args.file:
|
||||
text = FileHandler.read_file(args.file)
|
||||
elif args.text:
|
||||
text = args.text
|
||||
elif args.stdin:
|
||||
text = sys.stdin.read()
|
||||
else:
|
||||
print("错误:请指定输入源")
|
||||
sys.exit(1)
|
||||
|
||||
if not text.strip():
|
||||
print("警告:输入文本为空")
|
||||
sys.exit(0)
|
||||
|
||||
except (FileNotFoundError, PermissionError, UnicodeDecodeError) as e:
|
||||
print(f"错误:{e}")
|
||||
sys.exit(1)
|
||||
|
||||
# 创建处理器并处理文本
|
||||
try:
|
||||
processor = TextProcessor(
|
||||
min_length=args.length,
|
||||
custom_punctuation=args.punctuation
|
||||
)
|
||||
|
||||
processed_text = processor.process_text(text)
|
||||
|
||||
# 输出结果
|
||||
if args.output:
|
||||
FileHandler.write_file(args.output, processed_text, args.encoding)
|
||||
print(f"处理完成,结果已保存到 '{args.output}'")
|
||||
else:
|
||||
print("处理结果:")
|
||||
print("-" * 50)
|
||||
print(processed_text)
|
||||
|
||||
# 显示统计信息
|
||||
if args.statistics:
|
||||
processor.print_statistics()
|
||||
|
||||
except Exception as e:
|
||||
print(f"处理过程中发生错误:{e}")
|
||||
if args.verbose:
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
# 单元测试
|
||||
def run_tests():
|
||||
"""运行基本的单元测试"""
|
||||
print("运行单元测试...")
|
||||
|
||||
# 测试句子拆分
|
||||
processor = TextProcessor(min_length=6)
|
||||
|
||||
# 测试1:普通句子拆分
|
||||
test_text = "这是第一句。这是第二句!第三句?"
|
||||
sentences = processor.split_sentences(test_text)
|
||||
assert len(sentences) == 3, f"期望3个句子,实际{len(sentences)}个"
|
||||
assert sentences[0] == ("这是第一句", "。"), f"第一句解析错误:{sentences[0]}"
|
||||
|
||||
# 测试2:相邻字符交换
|
||||
long_sentence = "这是一个很长的句子用来测试字符交换功能"
|
||||
random.seed(42) # 固定种子以便测试
|
||||
result = processor.swap_random_chars(long_sentence)
|
||||
assert result != long_sentence, "长句子应该被修改"
|
||||
assert len(result) == len(long_sentence), "交换后长度应该不变"
|
||||
|
||||
# 验证只交换了相邻的两个字符
|
||||
diff_count = sum(1 for i, (a, b) in enumerate(zip(long_sentence, result)) if a != b)
|
||||
assert diff_count == 2, f"应该只有2个字符位置发生变化,实际{diff_count}个"
|
||||
|
||||
# 测试3:短句子不变
|
||||
short_sentence = "短句"
|
||||
result = processor.swap_random_chars(short_sentence)
|
||||
assert result == short_sentence, "短句子不应该被修改"
|
||||
|
||||
# 测试4:边界情况
|
||||
empty_result = processor.swap_random_chars("")
|
||||
assert empty_result == "", "空字符串应该保持不变"
|
||||
|
||||
print("✓ 所有测试通过!")
|
||||
|
||||
|
||||
# 示例使用
|
||||
def replace_text(text):
|
||||
# 检查是否运行测试
|
||||
if len(sys.argv) > 1 and sys.argv[1] == 'test':
|
||||
run_tests()
|
||||
sys.exit(0)
|
||||
|
||||
# 命令行模式
|
||||
if len(sys.argv) > 1:
|
||||
main()
|
||||
else:
|
||||
# 示例演示
|
||||
sample_text = text
|
||||
|
||||
print("示例演示:")
|
||||
print("原文:")
|
||||
print(sample_text)
|
||||
print("\n" + "=" * 50 + "\n")
|
||||
|
||||
processor = TextProcessor(min_length=9)
|
||||
processed = processor.process_text(sample_text)
|
||||
print("处理后:")
|
||||
print(processed)
|
||||
|
||||
processor.print_statistics()
|
||||
|
||||
print("\n使用说明:")
|
||||
print("命令行用法:")
|
||||
print(" python script.py -f input.txt # 处理文件")
|
||||
print(" python script.py -t '你的文本内容' # 直接处理文本")
|
||||
print(" python script.py -f input.txt -l 20 # 设置长度阈值为20")
|
||||
print(" python script.py -f input.txt -o output.txt # 输出到文件")
|
||||
print(" python script.py -f input.txt -p '。!?' -s # 自定义标点符号并显示统计")
|
||||
print(" python script.py test # 运行单元测试")
|
||||
|
||||
return processed
|
||||
|
||||
|
||||
|
||||
text = """QWERTYUIOP"""
|
||||
|
||||
|
||||
result = replace_text(text)
|
||||
print(result)
|
||||
26
ArticleReplaceBatch/test.py
Normal file
26
ArticleReplaceBatch/test.py
Normal file
@ -0,0 +1,26 @@
|
||||
from get_web_content import toutiao_w_extract_content
|
||||
from images_edit import download_and_process_images
|
||||
|
||||
|
||||
def get_img(url):
|
||||
|
||||
title, content, images = toutiao_w_extract_content(url)
|
||||
print(f"标题: {title}")
|
||||
print(f"内容长度: {len(content)}")
|
||||
print(f"图片数量: {len(images)}")
|
||||
print("图片URLs:")
|
||||
for i, img_url in enumerate(images, 1):
|
||||
print(f"{i}. {img_url}")
|
||||
|
||||
download_and_process_images(images, "n你好")
|
||||
|
||||
|
||||
|
||||
|
||||
# 使用示例
|
||||
if __name__ == "__main__":
|
||||
urls = ["https://www.toutiao.com/article/7533210726036275755/"
|
||||
]
|
||||
|
||||
for i in range(len(urls)):
|
||||
get_img(urls[i])
|
||||
11
ArticleReplaceBatch/toutiao_source_enhanced.html
Normal file
11
ArticleReplaceBatch/toutiao_source_enhanced.html
Normal file
File diff suppressed because one or more lines are too long
390
ArticleReplaceBatch/txt2docx.py
Normal file
390
ArticleReplaceBatch/txt2docx.py
Normal file
@ -0,0 +1,390 @@
|
||||
import PySimpleGUI as sg
|
||||
import json
|
||||
|
||||
import os
|
||||
import random
|
||||
|
||||
from docx.shared import Pt, RGBColor
|
||||
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT, WD_UNDERLINE
|
||||
from docx.enum.text import WD_ALIGN_PARAGRAPH
|
||||
from docx.oxml import OxmlElement
|
||||
from docx.oxml.ns import qn
|
||||
from docx.enum.style import WD_STYLE_TYPE
|
||||
from docx import Document
|
||||
from docx.shared import Inches
|
||||
from PIL import Image
|
||||
|
||||
# 保存文件路径的 JSON 文件
|
||||
SETTINGS_FILE = 'settings.json'
|
||||
|
||||
|
||||
def set_picture_wrapping(paragraph):
|
||||
"""
|
||||
设置图片环绕方式
|
||||
:param paragraph:
|
||||
:return:
|
||||
"""
|
||||
# 设置图片环绕方式为上下环绕
|
||||
pPr = paragraph._element.get_or_add_pPr()
|
||||
framePr = OxmlElement('w:framePr')
|
||||
framePr.set(qn('w:wrap'), 'around')
|
||||
framePr.set(qn('w:vAnchor'), 'text')
|
||||
framePr.set(qn('w:hAnchor'), 'text')
|
||||
pPr.append(framePr)
|
||||
|
||||
|
||||
def format_word_document(input_filename, output_filename):
|
||||
# 打开文档
|
||||
doc = Document(input_filename)
|
||||
|
||||
# 创建或更新标题样式
|
||||
style = doc.styles.add_style('CustomHeading', WD_STYLE_TYPE.PARAGRAPH)
|
||||
style.font.name = '黑体'
|
||||
style.font.size = Pt(22) # 二号字
|
||||
style.font.color.rgb = RGBColor(0, 0, 255) # 蓝色
|
||||
style.paragraph_format.space_after = Pt(12) # 标题后间距
|
||||
# 创建或更新正文样式
|
||||
style = doc.styles.add_style('CustomBody', WD_STYLE_TYPE.PARAGRAPH)
|
||||
style.font.name = '仿宋'
|
||||
style.font.size = Pt(14) # 四号字
|
||||
style.paragraph_format.first_line_indent = Pt(20) # 首行缩进两字符
|
||||
style.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.LEFT
|
||||
style.paragraph_format.line_spacing = 1.5 # 行间距
|
||||
style.paragraph_format.space_before = Pt(6) # 段前间距
|
||||
style.paragraph_format.space_after = Pt(6) # 段后间距
|
||||
|
||||
# 遍历所有段落
|
||||
for paragraph in doc.paragraphs:
|
||||
# 设置标题格式
|
||||
if paragraph.style.name.startswith('Heading'):
|
||||
paragraph.style = doc.styles['CustomHeading']
|
||||
|
||||
# 设置段落格式
|
||||
else:
|
||||
paragraph.style = doc.styles['CustomBody']
|
||||
|
||||
# 遍历所有图片
|
||||
for rel in doc.part.rels.values():
|
||||
if "image" in rel.target_ref:
|
||||
# 获取图片所在的段落
|
||||
for paragraph in doc.paragraphs:
|
||||
for run in paragraph.runs:
|
||||
if run._element.tag.endswith('}pict'):
|
||||
# 设置图片居中
|
||||
paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
||||
# 设置图片环绕方式为上下环绕
|
||||
set_picture_wrapping(paragraph)
|
||||
paragraph.paragraph_format.space_before = Pt(12)
|
||||
paragraph.paragraph_format.space_after = Pt(12)
|
||||
|
||||
# output_filename = remove_book_titles(output_filename)
|
||||
|
||||
# 保存文档
|
||||
doc.save(output_filename)
|
||||
|
||||
|
||||
def crop_and_replace_images(folder_path):
|
||||
"""
|
||||
修改图片尺寸
|
||||
:param folder_path:
|
||||
:return:
|
||||
"""
|
||||
folder_path = folder_path.strip()
|
||||
# 遍历文件夹中的所有文件
|
||||
if not os.path.exists(folder_path):
|
||||
os.mkdir(folder_path)
|
||||
else:
|
||||
for filename in os.listdir(folder_path):
|
||||
if os.path.exists(filename):
|
||||
# 检查文件扩展名是否为图片格式
|
||||
if filename.lower().endswith(('.jpg','.png')):
|
||||
# 拼接完整的文件路径
|
||||
file_path = os.path.join(folder_path, filename)
|
||||
print("文件夹路径:" + folder_path)
|
||||
print("文件路径:" + file_path)
|
||||
# 打开图片
|
||||
with Image.open(file_path) as img:
|
||||
# 获取图片的尺寸
|
||||
width, height = img.size
|
||||
# 裁剪图片,裁剪下方10px
|
||||
cropped_img = img.crop((0, 0, width, height - (height * 0.2)))
|
||||
# 保存裁剪后的图片,覆盖原文件
|
||||
output_path = file_path[0:file_path.find('.')] + '.png'
|
||||
cropped_img.save(output_path, 'PNG')
|
||||
|
||||
|
||||
def split_text_into_paragraphs(text):
|
||||
"""
|
||||
将文本分割成段落,并在每个段落之间加一个空行
|
||||
:param text: 输入的文本
|
||||
:return: 段落列表
|
||||
"""
|
||||
paragraphs = text.split('\n\n')
|
||||
# 过滤掉空行和只包含空白字符的段落
|
||||
paragraphs = list(filter(lambda p: p.strip(), paragraphs))
|
||||
|
||||
# 在每个段落之间加一个空行
|
||||
paragraphs_with_blank_lines = []
|
||||
for paragraph in paragraphs:
|
||||
paragraphs_with_blank_lines.append(paragraph)
|
||||
paragraphs_with_blank_lines.append('')
|
||||
|
||||
# 移除最后一个多余的空行
|
||||
if paragraphs_with_blank_lines:
|
||||
paragraphs_with_blank_lines.pop()
|
||||
|
||||
return paragraphs_with_blank_lines
|
||||
|
||||
|
||||
def insert_images_into_paragraphs(paragraphs, image_folder, doc, title):
|
||||
"""
|
||||
将图片插入到段落中
|
||||
:param paragraphs:
|
||||
:param image_folder:
|
||||
:param doc:
|
||||
:return:
|
||||
"""
|
||||
|
||||
if os.path.exists(image_folder):
|
||||
images = sorted([os.path.join(image_folder, img) for img in os.listdir(image_folder) if
|
||||
img.lower().endswith(('jpg'))])
|
||||
else:
|
||||
images = []
|
||||
|
||||
# 获取图片列表并排序
|
||||
# images = sorted([os.path.join(image_folder, img) for img in os.listdir(image_folder) if
|
||||
# img.lower().endswith(('jpg'))])
|
||||
# images = sorted([os.path.join(image_folder, img) for img in os.listdir(image_folder) if
|
||||
# # img.lower().endswith(('png', 'jpg', 'jpeg'))])
|
||||
|
||||
total_images = len(images)
|
||||
|
||||
image_index = 0
|
||||
for i, paragraph in enumerate(paragraphs):
|
||||
|
||||
if "正文:" in paragraph:
|
||||
paragraph = paragraph.replace("正文:", '')
|
||||
p = doc.add_paragraph(paragraph)
|
||||
if os.path.exists(image_folder):
|
||||
# 插入图片
|
||||
if image_index < total_images:
|
||||
img_path = images[image_index]
|
||||
|
||||
# 确保图片路径正确且图片文件存在
|
||||
if os.path.exists(img_path):
|
||||
try:
|
||||
with Image.open(img_path) as img:
|
||||
width, height = img.size
|
||||
doc.add_picture(img_path, width=Inches(width / height * 1.5))
|
||||
image_index += 1
|
||||
except Exception as e:
|
||||
print(f"无法识别图像: {img_path}, 错误: {e}")
|
||||
continue
|
||||
else:
|
||||
print(f"图片路径无效: {img_path}")
|
||||
|
||||
|
||||
def create_word_document(text, image_folder, output_path, title):
|
||||
"""
|
||||
创建Word文档
|
||||
:param text:
|
||||
:param image_folder:
|
||||
:param output_path:
|
||||
:return:
|
||||
"""
|
||||
try:
|
||||
doc = Document()
|
||||
paragraphs = split_text_into_paragraphs(text)
|
||||
insert_images_into_paragraphs(paragraphs, image_folder, doc, title)
|
||||
# modify_document(doc)
|
||||
doc.save(output_path)
|
||||
try:
|
||||
format_word_document(output_path, output_path)
|
||||
except Exception as e:
|
||||
print(f"格式化文档 {output_path} 时出错: {e}")
|
||||
print(f'文档已保存到: {output_path}')
|
||||
except Exception as e:
|
||||
print(f"创建文档 {output_path} 时出错: {e}")
|
||||
|
||||
|
||||
# 读取指定路径下txt文本的内容
|
||||
def read_text_file(file_path):
|
||||
"""
|
||||
读取指定路径下txt文本的内容
|
||||
:param file_path:
|
||||
:return:
|
||||
"""
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as file:
|
||||
return file.read()
|
||||
except Exception as e:
|
||||
print(f"读取文件 {file_path} 时出错: {e}")
|
||||
return ""
|
||||
|
||||
|
||||
def get_file_name(file_path):
|
||||
"""
|
||||
获取文件名
|
||||
:param file_path:
|
||||
:return:
|
||||
"""
|
||||
return os.path.basename(file_path)
|
||||
|
||||
|
||||
def apply_random_style(paragraph):
|
||||
# 预定义字体颜色列表
|
||||
predefined_font_colors = [
|
||||
RGBColor(255, 0, 0), # 红色
|
||||
RGBColor(255, 165, 0), # 橙色
|
||||
RGBColor(128, 0, 128), # 紫色
|
||||
]
|
||||
|
||||
# 预定义背景颜色列表(手动定义RGB颜色,避免太亮或太深)
|
||||
predefined_bg_colors = [
|
||||
RGBColor(240, 240, 240), # 浅灰色
|
||||
RGBColor(255, 255, 224), # 浅黄色
|
||||
RGBColor(224, 255, 224), # 浅绿色
|
||||
RGBColor(224, 255, 255), # 浅青色
|
||||
RGBColor(255, 228, 225), # 浅粉色
|
||||
RGBColor(240, 248, 255), # 浅蓝色
|
||||
]
|
||||
|
||||
# 获取段落中的每一个run对象(代表一段连续的文字)
|
||||
for run in paragraph.runs:
|
||||
# 随机选择样式
|
||||
style_choice = random.choice(['bold', 'italic', 'underline', 'color', 'background'])
|
||||
|
||||
if style_choice == 'bold':
|
||||
run.bold = True
|
||||
elif style_choice == 'italic':
|
||||
run.italic = True
|
||||
elif style_choice == 'underline':
|
||||
run.underline = WD_UNDERLINE.SINGLE
|
||||
elif style_choice == 'color':
|
||||
# 从预定义颜色中随机选择一个颜色
|
||||
run.font.color.rgb = random.choice(predefined_font_colors)
|
||||
elif style_choice == 'background':
|
||||
# 从预定义背景颜色中随机选择一个颜色
|
||||
run.font.color.highlight_color = random.choice(predefined_bg_colors)
|
||||
|
||||
|
||||
def txt2docx(txt_path, image_path, keep_txt=True):
|
||||
file_path = txt_path
|
||||
try:
|
||||
txts = sorted([os.path.join(file_path, txt) for txt in os.listdir(file_path) if
|
||||
txt.lower().endswith(('txt'))])
|
||||
except Exception as e:
|
||||
print(f"读取文件夹 {file_path} 时出错: {e}")
|
||||
sg.popup_error(f"读取文件夹 {file_path} 时出错: {e}")
|
||||
return
|
||||
|
||||
img_path = image_path
|
||||
|
||||
for txt in txts:
|
||||
try:
|
||||
print("正在修改:" + txt)
|
||||
text = read_text_file(txt)
|
||||
if not text: # 如果读取失败,跳过此文件
|
||||
print(f"跳过文件: {txt} (读取失败)")
|
||||
continue
|
||||
|
||||
# print(text)
|
||||
txt_name = get_file_name(txt)
|
||||
title_name = txt_name.replace(".txt", "")
|
||||
title = title_name
|
||||
print(title)
|
||||
if "正文:" in text:
|
||||
new_text = text.split('正文:')[1].replace("```markdown", "").replace("```", "")
|
||||
else:
|
||||
new_text = text.replace("```markdown", "").replace("```", "")
|
||||
content = new_text
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
img_path = Path(img_path)
|
||||
image_folder = img_path / txt_name.replace(".txt", "").rstrip(".")
|
||||
|
||||
# crop_and_replace_images(image_folder)
|
||||
|
||||
create_word_document(content, image_folder, txt.replace(".txt", ".docx"), title_name)
|
||||
|
||||
# 根据用户选择决定是否删除原始txt文件
|
||||
if not keep_txt:
|
||||
try:
|
||||
os.remove(txt)
|
||||
print(f"已删除原始文件: {txt}")
|
||||
except Exception as e:
|
||||
print(f"删除文件 {txt} 时出错: {e}")
|
||||
else:
|
||||
print(f"保留原始文件: {txt}")
|
||||
except Exception as e:
|
||||
print(f"处理文件 {txt} 时出错: {e}")
|
||||
continue # 继续处理下一个文件
|
||||
|
||||
|
||||
# 加载设置
|
||||
def load_settings():
|
||||
if os.path.exists(SETTINGS_FILE):
|
||||
with open(SETTINGS_FILE, 'r') as f:
|
||||
return json.load(f)
|
||||
return {'folder1': '', 'folder2': ''}
|
||||
|
||||
|
||||
# 保存设置
|
||||
def save_settings(settings):
|
||||
with open(SETTINGS_FILE, 'w') as f:
|
||||
json.dump(settings, f)
|
||||
|
||||
|
||||
# 自定义函数,用于处理用户选择的文件夹
|
||||
def process_folders(folder1, folder2, keep_txt=True):
|
||||
# 检查文件夹是否存在
|
||||
if not os.path.exists(folder1):
|
||||
sg.popup_error(f"文章文件夹不存在: {folder1}")
|
||||
return
|
||||
if not os.path.exists(folder2):
|
||||
sg.popup_error(f"图片文件夹不存在: {folder2}")
|
||||
return
|
||||
|
||||
# 在这里添加处理文件夹的代码
|
||||
try:
|
||||
txt2docx(folder1, folder2, keep_txt)
|
||||
sg.popup("处理完成!")
|
||||
except Exception as e:
|
||||
sg.popup_error(f"处理过程中出错: {e}")
|
||||
|
||||
|
||||
# 加载之前的设置
|
||||
settings = load_settings()
|
||||
if 'keep_txt' not in settings:
|
||||
settings['keep_txt'] = True
|
||||
|
||||
# 定义窗口的布局
|
||||
layout = [
|
||||
[sg.Text('文章文件夹:'), sg.Input(default_text=settings['folder1']), sg.FolderBrowse()],
|
||||
[sg.Text('图片文件夹:'), sg.Input(default_text=settings['folder2']), sg.FolderBrowse()],
|
||||
[sg.Checkbox('保留原始txt文件', default=settings['keep_txt'], key='keep_txt')],
|
||||
[sg.Button('确认'), sg.Button('取消')]
|
||||
]
|
||||
|
||||
# 创建窗口
|
||||
window = sg.Window('文件夹选择窗口', layout)
|
||||
|
||||
# 事件循环
|
||||
while True:
|
||||
event, values = window.read()
|
||||
if event == sg.WIN_CLOSED or event == '取消': # 如果用户关闭窗口或点击取消按钮
|
||||
break
|
||||
elif event == '确认': # 如果用户点击确认按钮
|
||||
folder1 = values[0]
|
||||
folder2 = values[1]
|
||||
keep_txt = values['keep_txt']
|
||||
process_folders(folder1, folder2, keep_txt)
|
||||
# 保存用户选择的文件夹路径和保留txt文件的选项
|
||||
settings['folder1'] = folder1
|
||||
settings['folder2'] = folder2
|
||||
settings['keep_txt'] = keep_txt
|
||||
save_settings(settings)
|
||||
|
||||
# 关闭窗口
|
||||
window.close()
|
||||
1389
ArticleReplaceBatch/txt2md2docx.py
Normal file
1389
ArticleReplaceBatch/txt2md2docx.py
Normal file
File diff suppressed because it is too large
Load Diff
170
ArticleReplaceBatch/utils.py
Normal file
170
ArticleReplaceBatch/utils.py
Normal file
@ -0,0 +1,170 @@
|
||||
import json
|
||||
|
||||
import re
|
||||
|
||||
import pandas as pd
|
||||
import requests
|
||||
from config import *
|
||||
|
||||
|
||||
def text_detection(text):
|
||||
"""
|
||||
百度检验文字是否违规
|
||||
:param text:
|
||||
:return:
|
||||
"""
|
||||
url = "https://aip.baidubce.com/rest/2.0/solution/v1/text_censor/v2/user_defined?access_token=" + get_baidu_access_token()
|
||||
payload = 'text=' + text
|
||||
headers = {
|
||||
'Content-Type': 'application/x-www-form-urlencoded',
|
||||
'Accept': 'application/json'
|
||||
}
|
||||
|
||||
response = requests.request("POST", url, headers=headers, data=payload)
|
||||
content = str(response.text)
|
||||
data = json.loads(content)
|
||||
print(data)
|
||||
conclusion = data['conclusion']
|
||||
return conclusion
|
||||
|
||||
|
||||
def get_baidu_access_token():
|
||||
"""
|
||||
使用 AK,SK 生成鉴权签名(Access Token),百度信息获取
|
||||
:return: access_token,或是None(如果错误)
|
||||
"""
|
||||
API_KEY = CONFIG['Baidu']['api_key']
|
||||
SECRET_KEY = CONFIG['Baidu']['secret_key']
|
||||
|
||||
url = "https://aip.baidubce.com/oauth/2.0/token"
|
||||
params = {"grant_type": "client_credentials", "client_id": API_KEY, "client_secret": SECRET_KEY}
|
||||
return str(requests.post(url, params=params).json().get("access_token"))
|
||||
|
||||
|
||||
def safe_filename(filename):
|
||||
"""
|
||||
处理文件名,移除或替换不安全的字符
|
||||
"""
|
||||
# 替换Windows文件系统中不允许的字符
|
||||
invalid_chars = '<>:"/\\|?*'
|
||||
for char in invalid_chars:
|
||||
filename = filename.replace(char, '_')
|
||||
# 去除首尾空格和点
|
||||
filename = filename.strip('. ')
|
||||
# 如果文件名为空,使用默认名称
|
||||
if not filename:
|
||||
filename = 'untitled'
|
||||
return filename
|
||||
|
||||
def safe_open_directory(directory_path):
|
||||
"""
|
||||
安全创建目录,确保路径格式正确并创建所有必要的父目录
|
||||
"""
|
||||
try:
|
||||
# 规范化路径
|
||||
directory_path = os.path.normpath(directory_path)
|
||||
if not os.path.exists(directory_path):
|
||||
os.makedirs(directory_path, exist_ok=True)
|
||||
os.chmod(directory_path, 0o777)
|
||||
except Exception as e:
|
||||
# 打印日志并保存到日志文件中
|
||||
logging.error(f"创建目录失败: {e}")
|
||||
raise
|
||||
|
||||
|
||||
|
||||
def check_keywords_in_text(text):
|
||||
"""
|
||||
检查文本中是否包含违禁词
|
||||
:param text:
|
||||
:return:
|
||||
"""
|
||||
keywords = CONFIG['Keywords']['banned_words'].split(',')
|
||||
for keyword in keywords:
|
||||
if keyword.strip() in text:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def extract_content_until_punctuation(text, punctuations=r'[,。!?;]'):
|
||||
"""
|
||||
截取一段话中从开始到最近的标点符号的内容。
|
||||
|
||||
:param text: 输入的文本
|
||||
:param punctuations: 标点符号的正则表达式模式,默认为",","。","!","?",";"
|
||||
:return: 截取的内容
|
||||
"""
|
||||
# 使用正则表达式查找标点符号的位置
|
||||
match = re.search(punctuations, text)
|
||||
|
||||
if match:
|
||||
# 如果找到标点符号,截取从开始到标点符号之前的部分
|
||||
return text[:match.end()].strip()
|
||||
else:
|
||||
# 如果没有找到标点符号,返回整个文本
|
||||
return text.strip()
|
||||
|
||||
|
||||
|
||||
# 读取Excel表格链接列和类型列的内容并将内容以元组列表的形式返回
|
||||
def read_excel(file_name):
|
||||
datas = pd.read_excel(file_name)
|
||||
first_column_name = datas.columns[0] # 链接列
|
||||
type_column_name = '领域' # 类型列
|
||||
|
||||
links = datas[first_column_name].tolist()
|
||||
# 如果存在类型列就读取,不存在则为默认类型
|
||||
types = datas[type_column_name].tolist() if type_column_name in datas.columns else ['默认'] * len(links)
|
||||
|
||||
# 将链接和类型组合成元组列表
|
||||
result = list(zip(links, types))
|
||||
print(result)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
|
||||
|
||||
from typing import Tuple
|
||||
|
||||
|
||||
def handle_duplicate_files_advanced(folder_path: str, filename: str) -> Tuple[str, bool]:
|
||||
"""
|
||||
增强版:处理文件夹中的同名文件,支持更复杂的场景
|
||||
|
||||
参数:
|
||||
folder_path: 文件夹路径
|
||||
filename: 原始文件名
|
||||
|
||||
返回:
|
||||
Tuple[str, bool]: (处理后的文件名, 是否是重命名的)
|
||||
"""
|
||||
# 首先处理文件名中的非法字符
|
||||
filename = safe_filename(filename)
|
||||
|
||||
base, ext = os.path.splitext(filename)
|
||||
target_path = os.path.join(folder_path, filename)
|
||||
|
||||
if not os.path.exists(target_path):
|
||||
return filename, False
|
||||
|
||||
existing_files = set(os.listdir(folder_path))
|
||||
pattern = re.compile(r'^{}(_(\d+))?{}$'.format(re.escape(base), re.escape(ext)))
|
||||
|
||||
# 找出所有匹配的文件并提取数字
|
||||
numbers = []
|
||||
for f in existing_files:
|
||||
match = pattern.match(f)
|
||||
if match:
|
||||
num = int(match.group(2)) if match.group(2) else 0
|
||||
numbers.append(num)
|
||||
|
||||
next_num = max(numbers) + 1 if numbers else 1
|
||||
new_filename = f"{base}_{next_num}{ext}"
|
||||
|
||||
# 确保新文件名也不存在(处理并发情况)
|
||||
while new_filename in existing_files:
|
||||
next_num += 1
|
||||
new_filename = f"{base}_{next_num}{ext}"
|
||||
|
||||
return new_filename, True
|
||||
24
LICENSE
24
LICENSE
@ -1,24 +0,0 @@
|
||||
Software License for MTL
|
||||
|
||||
Copyright (c) 2007 The Trustees of Indiana University.
|
||||
2008 Dresden University of Technology and the Trustees of Indiana University.
|
||||
2010 SimuNova UG (haftungsbeschränkt), www.simunova.com.
|
||||
All rights reserved.
|
||||
Authors: Peter Gottschling and Andrew Lumsdaine
|
||||
|
||||
This file is part of the Matrix Template Library
|
||||
|
||||
Dresden University of Technology -- short TUD -- and Indiana University -- short IU -- have the exclusive rights to license this product under the following license.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
|
||||
1. All redistributions of source code must retain the above copyright notice, the list of authors in the original source code, this list of conditions and the disclaimer listed in this license;
|
||||
2. All redistributions in binary form must reproduce the above copyright notice, this list of conditions and the disclaimer listed in this license in the documentation and/or other materials provided with the distribution;
|
||||
3. Any documentation included with all redistributions must include the following acknowledgement:
|
||||
"This product includes software developed at the University of Notre Dame, the Pervasive Technology Labs at Indiana University, and Dresden University of Technology. For technical information contact Andrew Lumsdaine at the Pervasive Technology Labs at Indiana University. For administrative and license questions contact the Advanced Research and Technology Institute at 1100 Waterway Blvd. Indianapolis, Indiana 46202, phone 317-274-5905, fax 317-274-5902."
|
||||
Alternatively, this acknowledgement may appear in the software itself, and wherever such third-party acknowledgments normally appear.
|
||||
4. The name "MTL" shall not be used to endorse or promote products derived from this software without prior written permission from IU or TUD. For written permission, please contact Indiana University Advanced Research & Technology Institute.
|
||||
5. Products derived from this software may not be called "MTL", nor may "MTL" appear in their name, without prior written permission of Indiana University Advanced Research & Technology Institute.
|
||||
|
||||
TUD and IU provide no reassurances that the source code provided does not infringe the patent or any other intellectual property rights of any other entity. TUD and IU disclaim any liability to any recipient for claims brought by any other entity based on infringement of intellectual property rights or otherwise.
|
||||
|
||||
LICENSEE UNDERSTANDS THAT SOFTWARE IS PROVIDED "AS IS" FOR WHICH NO WARRANTIES AS TO CAPABILITIES OR ACCURACY ARE MADE. DRESDEN UNIVERSITY OF TECHNOLOGY AND INDIANA UNIVERSITY GIVE NO WARRANTIES AND MAKE NO REPRESENTATION THAT SOFTWARE IS FREE OF INFRINGEMENT OF THIRD PARTY PATENT, COPYRIGHT, OR OTHER PROPRIETARY RIGHTS. DRESDEN UNIVERSITY OF TECHNOLOGY AND INDIANA UNIVERSITY MAKE NO WARRANTIES THAT SOFTWARE IS FREE FROM "BUGS", "VIRUSES", "TROJAN HORSES", "TRAP DOORS", "WORMS", OR OTHER HARMFUL CODE. LICENSEE ASSUMES THE ENTIRE RISK AS TO THE PERFORMANCE OF SOFTWARE AND/OR ASSOCIATED MATERIALS, AND TO THE PERFORMANCE AND VALIDITY OF INFORMATION GENERATED USING SOFTWARE.
|
||||
7
exeadd/exe_packer/config.json
Normal file
7
exeadd/exe_packer/config.json
Normal file
@ -0,0 +1,7 @@
|
||||
{
|
||||
"host": "taiyiagi.xyz",
|
||||
"port": 3306,
|
||||
"user": "taiyi",
|
||||
"password": "taiyi1224",
|
||||
"database": "license_system"
|
||||
}
|
||||
13
exeaddpassword/.env
Normal file
13
exeaddpassword/.env
Normal file
@ -0,0 +1,13 @@
|
||||
# 数据库配置
|
||||
MYSQL_HOST=taiyiagi.xyz
|
||||
MYSQL_PORT=3306
|
||||
MYSQL_USER=taiyi
|
||||
MYSQL_PASSWORD=taiyi1224
|
||||
MYSQL_DATABASE=exe_licensing
|
||||
|
||||
# 加密密钥(32字节,用于AES-256)
|
||||
ENCRYPTION_KEY=your-32-char-secret-key-here-123456
|
||||
|
||||
# 调试模式
|
||||
DEBUG=false
|
||||
VERBOSE=false
|
||||
4
file-receive-system/db.py
Normal file
4
file-receive-system/db.py
Normal file
@ -0,0 +1,4 @@
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
|
||||
# 创建数据库实例,独立于应用
|
||||
db = SQLAlchemy()
|
||||
Loading…
Reference in New Issue
Block a user