Compare commits
10 Commits
bd0c6a6ff0
...
3b305f1d72
| Author | SHA1 | Date | |
|---|---|---|---|
| 3b305f1d72 | |||
| 42fc2e661f | |||
| 0792027bea | |||
| 036eb90ca0 | |||
| 5397e7cfc2 | |||
|
|
666494c0c2 | ||
|
|
113c97c887 | ||
|
|
2d377de6fd | ||
|
|
65618b2c0b | ||
|
|
b9c1a797cb |
1563
ArticleReplaceBatch/ArticleReplaceDifyBatchWTT.py
Normal file
1563
ArticleReplaceBatch/ArticleReplaceDifyBatchWTT.py
Normal file
File diff suppressed because it is too large
Load Diff
@ -13,6 +13,7 @@ def call_dify_workflow(input_data):
|
||||
:param input_data: 传递给工作流的输入数据
|
||||
:return: 工作流的输出结果
|
||||
"""
|
||||
logger.info("Dify开始工作。。。")
|
||||
api_key = CONFIG['Dify']['api_key']
|
||||
user_id = CONFIG['Dify']['user_id']
|
||||
url = CONFIG['Dify']['url']
|
||||
@ -35,3 +36,138 @@ def call_dify_workflow(input_data):
|
||||
# print("article:", article)
|
||||
return article
|
||||
|
||||
|
||||
# ==========================调用coze工作流==========================
|
||||
|
||||
|
||||
def call_coze_workflow(parameters):
|
||||
"""
|
||||
调用 Coze 工作流的函数
|
||||
|
||||
:param parameters: 传递给工作流的输入参数(字典格式)
|
||||
:return: 工作流的执行结果
|
||||
"""
|
||||
logger.info("Coze开始工作。。。。")
|
||||
workflow_id = CONFIG['Coze']['workflow_id']
|
||||
access_token = CONFIG['Coze']['access_token']
|
||||
is_async = CONFIG['Coze']['is_async'].lower() == 'true'
|
||||
|
||||
url = "https://api.coze.cn/v1/workflow/run"
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {access_token}",
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
data = {
|
||||
"workflow_id": workflow_id,
|
||||
"parameters": parameters,
|
||||
"is_async": is_async
|
||||
}
|
||||
|
||||
response = requests.post(url, json=data, headers=headers)
|
||||
|
||||
if response.status_code == 200:
|
||||
# data = json.loads(response.text)['data']
|
||||
# print("data:",data['output'])
|
||||
|
||||
return response.text
|
||||
else:
|
||||
return {
|
||||
"error": f"请求失败,状态码:{response.status_code}",
|
||||
"detail": response.text
|
||||
}
|
||||
|
||||
|
||||
def call_coze_article_workflow(parameters):
|
||||
"""
|
||||
调用 Coze 工作流的函数
|
||||
|
||||
:param parameters: 传递给工作流的输入参数(字典格式)
|
||||
:param is_async: 是否异步执行(默认 False)
|
||||
:return: 工作流的执行结果
|
||||
"""
|
||||
|
||||
workflow_id = CONFIG['Coze']['workflow_id']
|
||||
access_token = CONFIG['Coze']['access_token']
|
||||
is_async = CONFIG['Coze']['is_async'].lower() == 'true'
|
||||
url = "https://api.coze.cn/v1/workflow/run"
|
||||
headers = {
|
||||
"Authorization": f"Bearer {access_token}",
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
data = {
|
||||
"workflow_id": workflow_id,
|
||||
"parameters": parameters,
|
||||
"is_async": is_async
|
||||
}
|
||||
|
||||
response = requests.post(url, json=data, headers=headers)
|
||||
|
||||
if response.status_code == 200:
|
||||
# data = json.loads(response.text)['data']
|
||||
# print("data:",data['output'])
|
||||
import ast
|
||||
|
||||
# 直接解析整个result字符串
|
||||
result_dict = ast.literal_eval(response.text)
|
||||
|
||||
# 解析data字段
|
||||
data_dict = ast.literal_eval(result_dict['data'])
|
||||
|
||||
# 获取output的值
|
||||
output_value = data_dict['output']
|
||||
|
||||
return output_value
|
||||
else:
|
||||
return {
|
||||
"error": f"请求失败,状态码:{response.status_code}",
|
||||
"detail": response.text
|
||||
}
|
||||
|
||||
|
||||
def call_coze_all_article_workflow(parameters,is_async=False):
|
||||
"""
|
||||
调用 Coze 工作流的函数
|
||||
|
||||
:param parameters: 传递给工作流的输入参数(字典格式)
|
||||
:param is_async: 是否异步执行(默认 False)
|
||||
:return: 工作流的执行结果
|
||||
"""
|
||||
workflow_id = CONFIG['Coze']['workflow_id']
|
||||
access_token = CONFIG['Coze']['access_token']
|
||||
is_async = CONFIG['Coze']['is_async'].lower() == 'False'
|
||||
url = "https://api.coze.cn/v1/workflow/run"
|
||||
headers = {
|
||||
"Authorization": f"Bearer {access_token}",
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
data = {
|
||||
"workflow_id": workflow_id,
|
||||
"parameters": parameters,
|
||||
"is_async": is_async
|
||||
}
|
||||
|
||||
response = requests.post(url, json=data, headers=headers)
|
||||
|
||||
if response.status_code == 200:
|
||||
# data = json.loads(response.text)['data']
|
||||
# print("data:",data['output'])
|
||||
import ast
|
||||
|
||||
# 直接解析整个result字符串
|
||||
result_dict = ast.literal_eval(response.text)
|
||||
print(result_dict)
|
||||
|
||||
# 解析data字段
|
||||
data_dict = ast.literal_eval(result_dict['data'])
|
||||
|
||||
# 获取output的值
|
||||
title = data_dict['title']
|
||||
article = data_dict['article']
|
||||
return title, article
|
||||
else:
|
||||
return {
|
||||
"error": f"请求失败,状态码:{response.status_code}",
|
||||
"detail": response.text
|
||||
}
|
||||
|
||||
@ -0,0 +1,12 @@
|
||||
|
||||
|
||||
你绝对想不到!江苏王女士最近收到电费单时惊了,夏天每月电费突然涨到800元。她翻出家里所有电器,连路由器都拔了,结果第二个月电费反而涨到900块!
|
||||
|
||||
据《现代快报》报道,供电局工作人员上门检查后才发现,罪魁祸首是待机状态的空调。王女士家3台空调插头都没拔,每月能白白耗掉200多度电。这事让不少网友直呼"活久见",有人留言:"我家电视常年插着电源,难怪电费总降不下来!"
|
||||
|
||||
其实国家电网早做过测试,普通家电待机功率在13瓦之间。按每天待机20小时算,光机顶盒一年就能吃掉30度电。更扎心的是,很多家庭至少有5台电器长期插着电,一年下来相当于白交三百块!
|
||||
|
||||
我特意翻出家里老电表,发现拔掉所有插头后,电表真的转得慢了。现在我家冰箱外的电器用完就拔,这个月省了五十多电费。你家电表跑得快吗?赶紧试试拔插头吧!
|
||||
|
||||
生活窍门 家庭用电 省电妙招 居家过日子
|
||||
你家最近电费有变化吗?评论区聊聊你的省电妙招吧!
|
||||
@ -0,0 +1,11 @@
|
||||
|
||||
|
||||
上海垃圾分类新规实施半个月,罚款总额突破200万!据东方网报道,光是黄浦区就开出了2.3万张罚单,平均每分钟都有居民被处罚。我家楼下王阿姨前天刚被罚了50块,就因为在垃圾站门口多站了半分钟。
|
||||
|
||||
可你绝对想不到,全市60%的罚款都集中在3个高档小区。这些小区明明配置了智能分类设备,还有专人指导,结果反而成了"重灾区"。隔壁张叔气得直拍大腿:"我天天在家分拣半小时,最后还因为垃圾袋颜色不对被罚!"
|
||||
|
||||
据环保局数据显示,新规实施后厨余垃圾分拣正确率反而下降了5%。这事真不能全怪老百姓,有些小区督导员自己都搞不清分类标准。我亲眼见过督导员把干电池扔进有害垃圾箱,那可是要扣分的啊!
|
||||
|
||||
不过话说回来,垃圾分类确实是利国利民的好事。关键是不能"一刀切",得给大伙儿适应时间。听说杭州试点"三次提醒再罚款"的模式,效果反而更好。这事您怎么看?您家小区垃圾分类顺利吗?
|
||||
|
||||
垃圾分类新规 罚款争议 上海热点 社区管理 民生政策
|
||||
@ -15,6 +15,14 @@ DEFAULT_CONFIG = {
|
||||
"title_file": "文章链接.xlsx",
|
||||
"max_threads": "3"
|
||||
},
|
||||
"Coze": {
|
||||
"workflow_id": "",
|
||||
"access_token": "",
|
||||
"is_async": "false",
|
||||
"input_data_template": "{\"article\": \"{article_text}\", \"link\":\"{link}\", \"weijin\":\"{weijin}\"}",
|
||||
"last_used_template": "",
|
||||
"last_used_template_type": "文章"
|
||||
},
|
||||
"Database": {
|
||||
"host": "27.106.125.150",
|
||||
"user": "root",
|
||||
|
||||
@ -10,6 +10,8 @@ from config import *
|
||||
from utils import safe_open_directory
|
||||
|
||||
IMGS_BASE_PATH = CONFIG['General']['images_path']
|
||||
|
||||
|
||||
def crop_and_replace_images(folder_path):
|
||||
"""
|
||||
修改图片尺寸
|
||||
@ -81,15 +83,25 @@ def download_image(image_url, save_path):
|
||||
print(f"请求出错:{e}")
|
||||
|
||||
|
||||
def download_and_process_images(img_urls, article_title):
|
||||
def download_and_process_images(img_urls, article_title, save_dir=None):
|
||||
"""
|
||||
下载并处理图片
|
||||
:param img_urls: 图片URL列表
|
||||
:param article_title: 文章标题
|
||||
:param save_dir: 自定义保存目录,如果为None则使用默认目录
|
||||
"""
|
||||
img_dir_path = os.path.join(IMGS_BASE_PATH, article_title)
|
||||
if save_dir is None:
|
||||
save_dir = IMGS_BASE_PATH
|
||||
|
||||
img_dir_path = os.path.join(str(save_dir), str(article_title))
|
||||
logger.info(f"图片保存路径:{img_dir_path}")
|
||||
safe_open_directory(img_dir_path)
|
||||
|
||||
for i, img_url in enumerate(img_urls):
|
||||
imgurl = "https:" + img_url
|
||||
if img_url.startswith("https"):
|
||||
imgurl = img_url
|
||||
else:
|
||||
imgurl = "https:"+img_url
|
||||
img_path = os.path.join(img_dir_path, f"图片{i}.jpg")
|
||||
try:
|
||||
download_image(imgurl, img_path)
|
||||
|
||||
@ -1,209 +0,0 @@
|
||||
import pandas as pd
|
||||
import getpass
|
||||
import sys # 导入sys模块
|
||||
import threading
|
||||
import queue
|
||||
|
||||
|
||||
from ai_studio import call_dify_workflow
|
||||
from databases import *
|
||||
|
||||
|
||||
from images_edit import download_and_process_images
|
||||
from utils import *
|
||||
from get_web_content import *
|
||||
from config import *
|
||||
|
||||
# ==============================主程序===========================
|
||||
def process_link(link):
|
||||
"""
|
||||
处理单个链接
|
||||
"""
|
||||
try:
|
||||
title_text, article_text, img_urls = "","",[]
|
||||
if str(link).startswith("https://www.toutiao.com/w"):
|
||||
title_text, article_text, img_urls = toutiao_w_extract_content(link)
|
||||
elif str(link).startswith("https://www.toutiao.com/article/"):
|
||||
title_text, article_text, img_urls = toutiao_extract_content(link)
|
||||
else:
|
||||
title_text, article_text, img_urls = "", "", []
|
||||
|
||||
|
||||
|
||||
# 获取数据库配置
|
||||
host = CONFIG['Database']['host']
|
||||
user = CONFIG['Database']['user']
|
||||
password = CONFIG['Database']['password']
|
||||
database = CONFIG['Database']['database']
|
||||
|
||||
# 判断文章内容是否有违禁词
|
||||
check_keywords = check_keywords_in_text(title_text)
|
||||
|
||||
if check_keywords:
|
||||
print("文章中有违禁词!")
|
||||
check_link_insert(host, user, password, database, link)
|
||||
return
|
||||
|
||||
title = extract_content_until_punctuation(article_text).replace("正文:", "")
|
||||
|
||||
print(title)
|
||||
print(article_text)
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
# 获取当前时间并格式化
|
||||
current_time = datetime.now().strftime("%H:%M:%S")
|
||||
|
||||
# 打印当前时间
|
||||
print("当前时间:", current_time)
|
||||
|
||||
input_data = {
|
||||
"old_article": article_text
|
||||
}
|
||||
|
||||
message_content = call_dify_workflow(input_data)
|
||||
# 获取当前时间并格式化
|
||||
current_time = datetime.now().strftime("%H:%M:%S")
|
||||
|
||||
# 打印当前时间
|
||||
print("当前时间:", current_time)
|
||||
|
||||
finally_article = message_content.replace("正文:", "") + "\n"
|
||||
|
||||
article_save_path = os.path.join(ARTICLES_BASE_PATH, f"{title}.txt")
|
||||
|
||||
if '*' in finally_article or '#' in finally_article or "-" in finally_article:
|
||||
# 使用正则表达式一次性替换多个字符
|
||||
old_content = re.sub(r'[*#-]', '', message_content)
|
||||
else:
|
||||
# 如果不需要替换,直接使用原内容
|
||||
old_content = finally_article
|
||||
|
||||
print("改写完成的文章:" + old_content)
|
||||
|
||||
# 删除AI词汇
|
||||
content = old_content
|
||||
|
||||
check_link_insert(host, user, password, database, link)
|
||||
|
||||
# 判断文章合规度
|
||||
if text_detection(content) == "合规":
|
||||
print("文章合规")
|
||||
pass
|
||||
else:
|
||||
print("文章不合规")
|
||||
return
|
||||
|
||||
with open(article_save_path, 'w', encoding='utf-8') as f:
|
||||
f.write(content)
|
||||
logging.info('文本已经保存')
|
||||
|
||||
if img_urls:
|
||||
download_and_process_images(img_urls, title)
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"处理链接 {link} 时出错: {e}")
|
||||
raise
|
||||
|
||||
|
||||
|
||||
def link_to_text(prompt1=None, prompt2=None, num_threads=None):
|
||||
use_link_path = 'use_link_path.txt'
|
||||
|
||||
# 读取链接
|
||||
links = read_excel(TITLE_BASE_PATH)
|
||||
|
||||
# 过滤已处理的链接
|
||||
filtered_links = []
|
||||
host = CONFIG['Database']['host']
|
||||
user = CONFIG['Database']['user']
|
||||
password = CONFIG['Database']['password']
|
||||
database = CONFIG['Database']['database']
|
||||
|
||||
for link in links:
|
||||
logging.info(f"总共{len(links)}个链接")
|
||||
if check_link_exists(host, user, password, database, link):
|
||||
logger.info(f"链接已存在: {link}")
|
||||
continue
|
||||
else:
|
||||
filtered_links.append(link)
|
||||
logger.info(f"链接不存在: {link}")
|
||||
print("链接不存在,存储到过滤器中:",link)
|
||||
|
||||
if not filtered_links:
|
||||
logger.info("没有新链接需要处理")
|
||||
return []
|
||||
|
||||
# 使用多线程处理链接
|
||||
results = process_links_with_threads(filtered_links, num_threads)
|
||||
|
||||
# 记录已处理的链接
|
||||
with open(use_link_path, 'a+', encoding='utf-8') as f:
|
||||
for link, success, _ in results:
|
||||
if success:
|
||||
f.write(link + "\n")
|
||||
|
||||
return results
|
||||
|
||||
|
||||
# 创建一个任务队列和结果队列
|
||||
task_queue = queue.Queue()
|
||||
result_queue = queue.Queue()
|
||||
|
||||
|
||||
# 工作线程函数
|
||||
def worker():
|
||||
while True:
|
||||
try:
|
||||
# 从队列中获取任务
|
||||
link = task_queue.get()
|
||||
if link is None: # 结束信号
|
||||
break
|
||||
|
||||
# 处理链接
|
||||
try:
|
||||
process_link(link)
|
||||
result_queue.put((link, True, None)) # 成功
|
||||
except Exception as e:
|
||||
result_queue.put((link, False, str(e))) # 失败
|
||||
logger.error(f"处理链接 {link} 时出错: {e}")
|
||||
|
||||
# 标记任务完成
|
||||
task_queue.task_done()
|
||||
except Exception as e:
|
||||
logger.error(f"工作线程出错: {e}")
|
||||
|
||||
|
||||
# 多线程处理链接
|
||||
def process_links_with_threads(links, num_threads=None):
|
||||
if num_threads is None:
|
||||
num_threads = min(MAX_THREADS, len(links))
|
||||
else:
|
||||
num_threads = min(num_threads, MAX_THREADS, len(links))
|
||||
|
||||
# 创建工作线程
|
||||
threads = []
|
||||
for _ in range(num_threads):
|
||||
t = threading.Thread(target=worker)
|
||||
t.daemon = True
|
||||
t.start()
|
||||
threads.append(t)
|
||||
|
||||
# 添加任务到队列
|
||||
for link in links:
|
||||
task_queue.put(link)
|
||||
|
||||
# 添加结束信号
|
||||
for _ in range(num_threads):
|
||||
task_queue.put(None)
|
||||
|
||||
# 等待所有线程完成
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
# 处理结果
|
||||
results = []
|
||||
while not result_queue.empty():
|
||||
results.append(result_queue.get())
|
||||
|
||||
return results
|
||||
299
ArticleReplaceBatch/main_process_wtt.py
Normal file
299
ArticleReplaceBatch/main_process_wtt.py
Normal file
@ -0,0 +1,299 @@
|
||||
import threading
|
||||
import queue
|
||||
import json # 导入 json 模块
|
||||
|
||||
from ai_studio import call_dify_workflow,call_coze_article_workflow,call_coze_all_article_workflow
|
||||
from databases import *
|
||||
|
||||
from images_edit import download_and_process_images
|
||||
from utils import *
|
||||
from get_web_content import *
|
||||
from config import *
|
||||
|
||||
|
||||
# ==============================主程序===========================
|
||||
def process_link(link_info, ai_service, current_template=None,generation_type=None):
|
||||
link, article_type = link_info # 解包链接和类型信息
|
||||
"""
|
||||
处理单个链接
|
||||
:param link: 要处理的链接
|
||||
:param ai_service: AI服务提供商,可选值:dify, coze
|
||||
:param current_template: 当前选择的模板配置
|
||||
"""
|
||||
try:
|
||||
if link.startswith("https://www.toutiao.com"):
|
||||
title_text, article_text, img_urls = toutiao_w_extract_content(link)
|
||||
if title_text == "":
|
||||
title_text, article_text, img_urls = toutiao_extract_content(link)
|
||||
elif link.startswith("https://mp.weixin.qq.co"):
|
||||
title_text, article_text, img_urls = wechat_extract_content(link)
|
||||
elif link.startswith("https://www.163.com"):
|
||||
title_text, article_text, img_urls = wangyi_extract_content(link)
|
||||
else:
|
||||
title_text, article_text, img_urls = "", "", []
|
||||
|
||||
if title_text == "":
|
||||
return
|
||||
elif len(title_text) > 100:
|
||||
return
|
||||
|
||||
# 获取数据库配置
|
||||
host = CONFIG['Database']['host']
|
||||
user = CONFIG['Database']['user']
|
||||
password = CONFIG['Database']['password']
|
||||
database = CONFIG['Database']['database']
|
||||
|
||||
# 判断文章内容是否有违禁词
|
||||
check_keywords = check_keywords_in_text(title_text)
|
||||
|
||||
title = extract_content_until_punctuation(article_text).replace("正文:", "")
|
||||
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
# 获取当前时间并格式化
|
||||
current_time = datetime.now().strftime("%H:%M:%S")
|
||||
|
||||
# 打印当前时间
|
||||
print("当前时间:", current_time)
|
||||
|
||||
if ai_service == "dify":
|
||||
if check_keywords:
|
||||
print("文章中有违禁词!")
|
||||
check_link_insert(host, user, password, database, link)
|
||||
return
|
||||
# 从配置加载 input_data 模板
|
||||
input_data_template_str = CONFIG['Dify'].get('input_data_template', '{{"old_article": "{article_text}"}}')
|
||||
try:
|
||||
# 解析模板字符串为字典
|
||||
input_data_template = json.loads(input_data_template_str)
|
||||
# 使用实际变量格式化模板
|
||||
input_data = {k: v.format(article_text=article_text) for k, v in input_data_template.items()}
|
||||
except (json.JSONDecodeError, KeyError, AttributeError) as e:
|
||||
logger.error(f"处理 Dify input_data 模板时出错: {e}. 使用默认模板.")
|
||||
input_data = {
|
||||
"old_article": article_text
|
||||
}
|
||||
|
||||
# input_data = {
|
||||
# "old_article": article_text
|
||||
# }
|
||||
message_content = call_dify_workflow(input_data)
|
||||
elif ai_service == "coze":
|
||||
logger.info("coze正在处理")
|
||||
logger.info(f"正在处理的文章类型为:{generation_type}")
|
||||
# 如果有模板配置,临时更新CONFIG
|
||||
original_config = None
|
||||
if current_template:
|
||||
original_config = {
|
||||
'workflow_id': CONFIG['Coze']['workflow_id'],
|
||||
'access_token': CONFIG['Coze']['access_token'],
|
||||
'is_async': CONFIG['Coze']['is_async'],
|
||||
'input_data_template': CONFIG['Coze'].get('input_data_template', '')
|
||||
}
|
||||
|
||||
CONFIG['Coze']['workflow_id'] = current_template.get('workflow_id', '')
|
||||
CONFIG['Coze']['access_token'] = current_template.get('access_token', '')
|
||||
CONFIG['Coze']['is_async'] = current_template.get('is_async', 'true')
|
||||
CONFIG['Coze']['input_data_template'] = current_template.get('input_data_template', '')
|
||||
|
||||
logger.info(f"应用模板配置: {current_template.get('name')}")
|
||||
logger.info(f"Workflow ID: {CONFIG['Coze']['workflow_id']}")
|
||||
logger.info(f"Access Token: {'*' * len(CONFIG['Coze']['access_token'])}")
|
||||
logger.info(f"Is Async: {CONFIG['Coze']['is_async']}")
|
||||
logger.info(f"Input Template: {CONFIG['Coze']['input_data_template']}")
|
||||
|
||||
try:
|
||||
# 从配置加载 Coze input_data 模板
|
||||
input_data_template_str = CONFIG['Coze'].get('input_data_template')
|
||||
# 解析模板字符串为字典
|
||||
input_data_template = json.loads(input_data_template_str)
|
||||
# 使用实际变量格式化模板
|
||||
title = ""
|
||||
if generation_type == "短篇":
|
||||
input_data = {
|
||||
"article": article_text
|
||||
}
|
||||
print("coze中输入:",input_data)
|
||||
message_content = call_coze_article_workflow(input_data)
|
||||
|
||||
elif generation_type == "文章":
|
||||
print("原文中标题为:",title_text)
|
||||
print("原文中内容为:",article_text)
|
||||
input_data = {
|
||||
"title":title_text,
|
||||
"article": article_text
|
||||
}
|
||||
print("发送的请求数据为:",input_data)
|
||||
title, message_content = call_coze_all_article_workflow(input_data)
|
||||
|
||||
|
||||
finally:
|
||||
# 恢复原始配置(如果有的话)
|
||||
if original_config is not None:
|
||||
CONFIG['Coze']['workflow_id'] = original_config['workflow_id']
|
||||
CONFIG['Coze']['access_token'] = original_config['access_token']
|
||||
CONFIG['Coze']['is_async'] = original_config['is_async']
|
||||
CONFIG['Coze']['input_data_template'] = original_config['input_data_template']
|
||||
|
||||
# 获取当前时间并格式化
|
||||
current_time = datetime.now().strftime("%H:%M:%S")
|
||||
|
||||
print("原文章", article_text)
|
||||
print("========================")
|
||||
print("改写后的文章",message_content)
|
||||
|
||||
# 打印当前时间
|
||||
print("当前时间:", current_time)
|
||||
file_name = ""
|
||||
if generation_type == '短篇':
|
||||
file_name = handle_duplicate_files_advanced(ARTICLES_BASE_PATH,title_text)[0]
|
||||
elif generation_type == "文章":
|
||||
file_name = handle_duplicate_files_advanced(ARTICLES_BASE_PATH,title)[0]
|
||||
|
||||
|
||||
# 创建类型目录
|
||||
type_dir = os.path.join(ARTICLES_BASE_PATH, article_type)
|
||||
safe_open_directory(type_dir)
|
||||
|
||||
# 在类型目录下保存文章
|
||||
article_save_path = os.path.join(type_dir, f"{file_name}.txt")
|
||||
|
||||
|
||||
|
||||
|
||||
# 判断文章合规度
|
||||
if text_detection(message_content) == "合规":
|
||||
print("文章合规")
|
||||
pass
|
||||
else:
|
||||
print("文章不合规")
|
||||
return
|
||||
|
||||
with open(article_save_path, 'w', encoding='utf-8') as f:
|
||||
f.write(message_content)
|
||||
logging.info('文本已经保存')
|
||||
|
||||
if img_urls:
|
||||
# 在类型目录下创建图片目录
|
||||
type_picture_dir = os.path.join(IMGS_BASE_PATH, article_type)
|
||||
safe_open_directory(type_picture_dir)
|
||||
download_and_process_images(img_urls, file_name, type_picture_dir)
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"处理链接 {link} 时出错: {e}")
|
||||
raise
|
||||
|
||||
|
||||
def link_to_text(num_threads=None, ai_service="dify", current_template=None, generation_type=None):
|
||||
use_link_path = 'use_link_path.txt'
|
||||
|
||||
# 读取链接
|
||||
links = read_excel(TITLE_BASE_PATH)
|
||||
|
||||
# 过滤已处理的链接
|
||||
filtered_links = []
|
||||
host = CONFIG['Database']['host']
|
||||
user = CONFIG['Database']['user']
|
||||
password = CONFIG['Database']['password']
|
||||
database = CONFIG['Database']['database']
|
||||
|
||||
for link_info in links:
|
||||
link = link_info[0].strip() # 获取链接并去除空白字符
|
||||
# 如果Excel中有类型,使用Excel中的类型,否则使用传入的generation_type
|
||||
article_type = link_info[1].strip() if len(link_info) > 1 and link_info[1].strip() else generation_type
|
||||
logging.info(f"总共{len(links)}个链接")
|
||||
# if check_link_exists(host, user, password, database, link):
|
||||
# logger.info(f"链接已存在: {link}")
|
||||
# continue
|
||||
# else:
|
||||
filtered_links.append((link, article_type)) # 保存链接和类型的元组
|
||||
# logger.info(f"链接不存在: {link}")
|
||||
# print("链接不存在,存储到过滤器中:", link)
|
||||
|
||||
if not filtered_links:
|
||||
logger.info("没有新链接需要处理")
|
||||
return []
|
||||
|
||||
# 使用多线程处理链接
|
||||
results = process_links_with_threads(filtered_links, num_threads, ai_service, current_template,generation_type)
|
||||
|
||||
# 记录已处理的链接
|
||||
with open(use_link_path, 'a+', encoding='utf-8') as f:
|
||||
for link, success, _ in results:
|
||||
if success:
|
||||
f.write(link + "\n")
|
||||
|
||||
return results
|
||||
|
||||
|
||||
# 创建一个任务队列和结果队列
|
||||
task_queue = queue.Queue()
|
||||
result_queue = queue.Queue()
|
||||
|
||||
|
||||
# 工作线程函数
|
||||
def worker(ai_service, current_template=None,generation_type=None):
|
||||
while True:
|
||||
try:
|
||||
# 从队列中获取任务
|
||||
link = task_queue.get()
|
||||
if link is None: # 结束信号
|
||||
break
|
||||
|
||||
# 处理链接
|
||||
try:
|
||||
logger.info(f"开始处理链接:{link}")
|
||||
process_link(link, ai_service, current_template,generation_type)
|
||||
result_queue.put((link, True, None)) # 成功
|
||||
except Exception as e:
|
||||
result_queue.put((link, False, str(e))) # 失败
|
||||
logger.error(f"处理链接 {link} 时出错: {e}")
|
||||
|
||||
# 标记任务完成
|
||||
task_queue.task_done()
|
||||
except Exception as e:
|
||||
logger.error(f"工作线程出错: {e}")
|
||||
|
||||
|
||||
# 多线程处理链接
|
||||
def process_links_with_threads(links, num_threads=None, ai_service="dify", current_template=None,generation_type=None):
|
||||
if num_threads is None:
|
||||
num_threads = min(MAX_THREADS, len(links))
|
||||
else:
|
||||
num_threads = min(num_threads, MAX_THREADS, len(links))
|
||||
|
||||
# 清空任务队列和结果队列
|
||||
while not task_queue.empty():
|
||||
task_queue.get()
|
||||
while not result_queue.empty():
|
||||
result_queue.get()
|
||||
|
||||
# 创建工作线程
|
||||
threads = []
|
||||
|
||||
# 将AI服务选择和模板配置传递给worker函数
|
||||
for _ in range(num_threads):
|
||||
t = threading.Thread(target=worker, args=(ai_service, current_template,generation_type))
|
||||
t.daemon = True
|
||||
t.start()
|
||||
threads.append(t)
|
||||
|
||||
# 添加任务到队列
|
||||
for link in links:
|
||||
task_queue.put(link)
|
||||
|
||||
# 添加结束信号
|
||||
for _ in range(num_threads):
|
||||
task_queue.put(None)
|
||||
|
||||
# 等待所有线程完成
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
# 处理结果
|
||||
results = []
|
||||
while not result_queue.empty():
|
||||
results.append(result_queue.get())
|
||||
|
||||
return results
|
||||
25
ArticleReplaceBatch/model/config.json
Normal file
25
ArticleReplaceBatch/model/config.json
Normal file
@ -0,0 +1,25 @@
|
||||
{
|
||||
"architectures": [
|
||||
"BertForMaskedLM"
|
||||
],
|
||||
"attention_probs_dropout_prob": 0.1,
|
||||
"directionality": "bidi",
|
||||
"hidden_act": "gelu",
|
||||
"hidden_dropout_prob": 0.1,
|
||||
"hidden_size": 768,
|
||||
"initializer_range": 0.02,
|
||||
"intermediate_size": 3072,
|
||||
"layer_norm_eps": 1e-12,
|
||||
"max_position_embeddings": 512,
|
||||
"model_type": "bert",
|
||||
"num_attention_heads": 12,
|
||||
"num_hidden_layers": 12,
|
||||
"pad_token_id": 0,
|
||||
"pooler_fc_size": 768,
|
||||
"pooler_num_attention_heads": 12,
|
||||
"pooler_num_fc_layers": 3,
|
||||
"pooler_size_per_head": 128,
|
||||
"pooler_type": "first_token_transform",
|
||||
"type_vocab_size": 2,
|
||||
"vocab_size": 21128
|
||||
}
|
||||
Binary file not shown.
|
After Width: | Height: | Size: 120 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 28 KiB |
464
ArticleReplaceBatch/replacestr.py
Normal file
464
ArticleReplaceBatch/replacestr.py
Normal file
@ -0,0 +1,464 @@
|
||||
import re
|
||||
import random
|
||||
import argparse
|
||||
import sys
|
||||
import os
|
||||
from typing import List, Tuple, Optional, Dict, Any
|
||||
from pathlib import Path
|
||||
import logging
|
||||
|
||||
|
||||
class TextProcessor:
|
||||
"""文本处理器类,支持句子拆分和字符交换"""
|
||||
|
||||
def __init__(self, min_length: int = 30, custom_punctuation: Optional[str] = None):
|
||||
"""
|
||||
初始化文本处理器
|
||||
|
||||
Args:
|
||||
min_length: 句子长度阈值
|
||||
custom_punctuation: 自定义标点符号,如果为None则使用默认标点
|
||||
"""
|
||||
self.min_length = min_length
|
||||
self.sentence_endings = custom_punctuation or r'[,!?;?!;]'
|
||||
self.statistics = {
|
||||
'total_sentences': 0,
|
||||
'processed_sentences': 0,
|
||||
'total_chars': 0,
|
||||
'swapped_chars': 0
|
||||
}
|
||||
|
||||
# 设置日志
|
||||
logging.basicConfig(level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
||||
def split_sentences(self, text: str) -> List[Tuple[str, str]]:
|
||||
"""
|
||||
按标点符号拆分句子,保留标点符号
|
||||
|
||||
Args:
|
||||
text: 输入文本
|
||||
|
||||
Returns:
|
||||
List[Tuple[str, str]]: 每个元组包含 (句子内容, 标点符号)
|
||||
"""
|
||||
if not text.strip():
|
||||
return []
|
||||
|
||||
# 使用正则表达式拆分,保留分隔符
|
||||
parts = re.split(f'({self.sentence_endings})', text)
|
||||
|
||||
sentences = []
|
||||
i = 0
|
||||
while i < len(parts):
|
||||
content = parts[i].strip()
|
||||
if content: # 非空内容
|
||||
# 检查下一个部分是否是标点符号
|
||||
if i + 1 < len(parts) and re.match(self.sentence_endings, parts[i + 1]):
|
||||
punctuation = parts[i + 1]
|
||||
i += 2
|
||||
else:
|
||||
punctuation = ''
|
||||
i += 1
|
||||
sentences.append((content, punctuation))
|
||||
self.statistics['total_sentences'] += 1
|
||||
else:
|
||||
i += 1
|
||||
|
||||
return sentences
|
||||
|
||||
def swap_random_chars(self, sentence: str) -> str:
|
||||
"""
|
||||
对超长句子随机交换相邻两个字符的顺序
|
||||
|
||||
Args:
|
||||
sentence: 输入句子
|
||||
|
||||
Returns:
|
||||
str: 处理后的句子
|
||||
"""
|
||||
# 边界情况处理
|
||||
if not sentence or len(sentence) <= self.min_length or len(sentence) <= 3:
|
||||
return sentence
|
||||
|
||||
# 转换为字符列表便于操作
|
||||
chars = list(sentence)
|
||||
original_length = len(chars)
|
||||
|
||||
# 确定可交换的范围(避开首尾字符,且需要成对相邻)
|
||||
# 对于长度为n的句子,可交换的相邻对位置为:(1,2), (2,3), ..., (n-3,n-2)
|
||||
start_idx = 1
|
||||
end_idx = len(chars) - 3 # 最后一个可交换对的起始位置
|
||||
|
||||
if end_idx < start_idx:
|
||||
return sentence
|
||||
|
||||
try:
|
||||
# 随机选择一个相邻对的起始位置
|
||||
swap_start = random.randint(start_idx, end_idx)
|
||||
swap_end = swap_start + 1
|
||||
|
||||
# 交换相邻的两个字符
|
||||
chars[swap_start], chars[swap_end] = chars[swap_end], chars[swap_start]
|
||||
|
||||
# 更新统计信息
|
||||
self.statistics['processed_sentences'] += 1
|
||||
self.statistics['swapped_chars'] += 2
|
||||
|
||||
self.logger.debug(f"交换相邻位置 {swap_start} 和 {swap_end},句子长度:{original_length}")
|
||||
|
||||
except (ValueError, IndexError) as e:
|
||||
self.logger.warning(f"字符交换失败:{e}")
|
||||
return sentence
|
||||
|
||||
return ''.join(chars)
|
||||
|
||||
def process_text(self, text: str) -> str:
|
||||
"""
|
||||
处理文本:拆分句子并对超长句子进行字符交换
|
||||
|
||||
Args:
|
||||
text: 输入文本
|
||||
|
||||
Returns:
|
||||
str: 处理后的文本
|
||||
"""
|
||||
if not text:
|
||||
return text
|
||||
|
||||
# 重置统计信息
|
||||
self.statistics = {
|
||||
'total_sentences': 0,
|
||||
'processed_sentences': 0,
|
||||
'total_chars': len(text),
|
||||
'swapped_chars': 0
|
||||
}
|
||||
|
||||
# 按段落分割
|
||||
paragraphs = text.split('\n')
|
||||
processed_paragraphs = []
|
||||
|
||||
for paragraph in paragraphs:
|
||||
if not paragraph.strip():
|
||||
processed_paragraphs.append(paragraph)
|
||||
continue
|
||||
|
||||
# 拆分句子
|
||||
sentences = self.split_sentences(paragraph)
|
||||
|
||||
# 处理每个句子
|
||||
processed_sentences = []
|
||||
for sentence_content, punctuation in sentences:
|
||||
# 对句子内容进行字符交换
|
||||
processed_content = self.swap_random_chars(sentence_content)
|
||||
processed_sentences.append(processed_content + punctuation)
|
||||
|
||||
# 重新组合句子
|
||||
processed_paragraph = ''.join(processed_sentences)
|
||||
processed_paragraphs.append(processed_paragraph)
|
||||
|
||||
return '\n'.join(processed_paragraphs)
|
||||
|
||||
def get_statistics(self) -> Dict[str, Any]:
|
||||
"""获取处理统计信息"""
|
||||
return self.statistics.copy()
|
||||
|
||||
def print_statistics(self):
|
||||
"""打印处理统计信息"""
|
||||
stats = self.get_statistics()
|
||||
print("\n" + "=" * 50)
|
||||
print("处理统计信息:")
|
||||
print(f"总字符数:{stats['total_chars']}")
|
||||
print(f"总句子数:{stats['total_sentences']}")
|
||||
print(f"处理句子数:{stats['processed_sentences']}")
|
||||
print(f"交换字符数:{stats['swapped_chars']}")
|
||||
if stats['total_sentences'] > 0:
|
||||
print(f"处理率:{stats['processed_sentences'] / stats['total_sentences'] * 100:.1f}%")
|
||||
print("=" * 50)
|
||||
|
||||
|
||||
class FileHandler:
|
||||
"""文件处理器,负责文件的读写操作"""
|
||||
|
||||
@staticmethod
|
||||
def read_file(filename: str) -> str:
|
||||
"""
|
||||
读取文件内容,支持多种编码
|
||||
|
||||
Args:
|
||||
filename: 文件路径
|
||||
|
||||
Returns:
|
||||
str: 文件内容
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: 文件不存在
|
||||
PermissionError: 权限不足
|
||||
UnicodeDecodeError: 编码错误
|
||||
"""
|
||||
if not os.path.exists(filename):
|
||||
raise FileNotFoundError(f"文件 '{filename}' 不存在")
|
||||
|
||||
if not os.access(filename, os.R_OK):
|
||||
raise PermissionError(f"没有读取文件 '{filename}' 的权限")
|
||||
|
||||
# 尝试多种编码格式
|
||||
encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1']
|
||||
|
||||
for encoding in encodings:
|
||||
try:
|
||||
with open(filename, 'r', encoding=encoding) as f:
|
||||
content = f.read()
|
||||
logging.info(f"使用 {encoding} 编码成功读取文件:{filename}")
|
||||
return content
|
||||
except UnicodeDecodeError:
|
||||
continue
|
||||
|
||||
raise UnicodeDecodeError(f"无法解码文件 '{filename}',尝试的编码格式:{encodings}")
|
||||
|
||||
@staticmethod
|
||||
def write_file(filename: str, content: str, encoding: str = 'utf-8') -> None:
|
||||
"""
|
||||
写入文件内容
|
||||
|
||||
Args:
|
||||
filename: 输出文件路径
|
||||
content: 要写入的内容
|
||||
encoding: 编码格式
|
||||
|
||||
Raises:
|
||||
PermissionError: 权限不足
|
||||
OSError: 磁盘空间不足等系统错误
|
||||
"""
|
||||
# 确保目录存在
|
||||
output_dir = os.path.dirname(filename)
|
||||
if output_dir and not os.path.exists(output_dir):
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
try:
|
||||
with open(filename, 'w', encoding=encoding) as f:
|
||||
f.write(content)
|
||||
logging.info(f"成功写入文件:{filename}")
|
||||
except PermissionError:
|
||||
raise PermissionError(f"没有写入文件 '{filename}' 的权限")
|
||||
except OSError as e:
|
||||
raise OSError(f"写入文件 '{filename}' 时发生错误:{e}")
|
||||
|
||||
|
||||
def setup_argument_parser() -> argparse.ArgumentParser:
|
||||
"""设置命令行参数解析器"""
|
||||
parser = argparse.ArgumentParser(
|
||||
description='文本句子字符交换处理器',
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog="""
|
||||
使用示例:
|
||||
%(prog)s -f input.txt # 处理文件
|
||||
%(prog)s -t "你的文本内容" # 直接处理文本
|
||||
%(prog)s -f input.txt -l 20 # 设置长度阈值为20
|
||||
%(prog)s -f input.txt -o output.txt # 输出到文件
|
||||
%(prog)s -f input.txt -p "。!?" -s # 自定义标点符号并显示统计
|
||||
"""
|
||||
)
|
||||
|
||||
# 输入选项
|
||||
input_group = parser.add_mutually_exclusive_group(required=True)
|
||||
input_group.add_argument('-f', '--file', help='输入文件路径')
|
||||
input_group.add_argument('-t', '--text', help='直接输入文本')
|
||||
input_group.add_argument('--stdin', action='store_true',
|
||||
help='从标准输入读取文本')
|
||||
|
||||
# 处理选项
|
||||
parser.add_argument('-l', '--length', type=int, default=30,
|
||||
help='句子长度阈值(默认30)')
|
||||
parser.add_argument('-p', '--punctuation',
|
||||
help='自定义标点符号(默认:。!?;?!;)')
|
||||
parser.add_argument('-o', '--output', help='输出文件路径')
|
||||
parser.add_argument('-e', '--encoding', default='utf-8',
|
||||
help='输出文件编码(默认utf-8)')
|
||||
|
||||
# 其他选项
|
||||
parser.add_argument('-s', '--statistics', action='store_true',
|
||||
help='显示处理统计信息')
|
||||
parser.add_argument('-v', '--verbose', action='store_true',
|
||||
help='显示详细日志')
|
||||
parser.add_argument('--seed', type=int, help='随机数种子(用于测试)')
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
def main():
|
||||
"""主函数:处理命令行参数和文本处理"""
|
||||
parser = setup_argument_parser()
|
||||
args = parser.parse_args()
|
||||
|
||||
# 设置日志级别
|
||||
if args.verbose:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
# 设置随机数种子(用于测试)
|
||||
if args.seed:
|
||||
random.seed(args.seed)
|
||||
|
||||
# 获取输入文本
|
||||
try:
|
||||
if args.file:
|
||||
text = FileHandler.read_file(args.file)
|
||||
elif args.text:
|
||||
text = args.text
|
||||
elif args.stdin:
|
||||
text = sys.stdin.read()
|
||||
else:
|
||||
print("错误:请指定输入源")
|
||||
sys.exit(1)
|
||||
|
||||
if not text.strip():
|
||||
print("警告:输入文本为空")
|
||||
sys.exit(0)
|
||||
|
||||
except (FileNotFoundError, PermissionError, UnicodeDecodeError) as e:
|
||||
print(f"错误:{e}")
|
||||
sys.exit(1)
|
||||
|
||||
# 创建处理器并处理文本
|
||||
try:
|
||||
processor = TextProcessor(
|
||||
min_length=args.length,
|
||||
custom_punctuation=args.punctuation
|
||||
)
|
||||
|
||||
processed_text = processor.process_text(text)
|
||||
|
||||
# 输出结果
|
||||
if args.output:
|
||||
FileHandler.write_file(args.output, processed_text, args.encoding)
|
||||
print(f"处理完成,结果已保存到 '{args.output}'")
|
||||
else:
|
||||
print("处理结果:")
|
||||
print("-" * 50)
|
||||
print(processed_text)
|
||||
|
||||
# 显示统计信息
|
||||
if args.statistics:
|
||||
processor.print_statistics()
|
||||
|
||||
except Exception as e:
|
||||
print(f"处理过程中发生错误:{e}")
|
||||
if args.verbose:
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
# 单元测试
|
||||
def run_tests():
|
||||
"""运行基本的单元测试"""
|
||||
print("运行单元测试...")
|
||||
|
||||
# 测试句子拆分
|
||||
processor = TextProcessor(min_length=6)
|
||||
|
||||
# 测试1:普通句子拆分
|
||||
test_text = "这是第一句。这是第二句!第三句?"
|
||||
sentences = processor.split_sentences(test_text)
|
||||
assert len(sentences) == 3, f"期望3个句子,实际{len(sentences)}个"
|
||||
assert sentences[0] == ("这是第一句", "。"), f"第一句解析错误:{sentences[0]}"
|
||||
|
||||
# 测试2:相邻字符交换
|
||||
long_sentence = "这是一个很长的句子用来测试字符交换功能"
|
||||
random.seed(42) # 固定种子以便测试
|
||||
result = processor.swap_random_chars(long_sentence)
|
||||
assert result != long_sentence, "长句子应该被修改"
|
||||
assert len(result) == len(long_sentence), "交换后长度应该不变"
|
||||
|
||||
# 验证只交换了相邻的两个字符
|
||||
diff_count = sum(1 for i, (a, b) in enumerate(zip(long_sentence, result)) if a != b)
|
||||
assert diff_count == 2, f"应该只有2个字符位置发生变化,实际{diff_count}个"
|
||||
|
||||
# 测试3:短句子不变
|
||||
short_sentence = "短句"
|
||||
result = processor.swap_random_chars(short_sentence)
|
||||
assert result == short_sentence, "短句子不应该被修改"
|
||||
|
||||
# 测试4:边界情况
|
||||
empty_result = processor.swap_random_chars("")
|
||||
assert empty_result == "", "空字符串应该保持不变"
|
||||
|
||||
print("✓ 所有测试通过!")
|
||||
|
||||
|
||||
# 示例使用
|
||||
def replace_text(text):
|
||||
# 检查是否运行测试
|
||||
if len(sys.argv) > 1 and sys.argv[1] == 'test':
|
||||
run_tests()
|
||||
sys.exit(0)
|
||||
|
||||
# 命令行模式
|
||||
if len(sys.argv) > 1:
|
||||
main()
|
||||
else:
|
||||
# 示例演示
|
||||
sample_text = text
|
||||
|
||||
print("示例演示:")
|
||||
print("原文:")
|
||||
print(sample_text)
|
||||
print("\n" + "=" * 50 + "\n")
|
||||
|
||||
processor = TextProcessor(min_length=9)
|
||||
processed = processor.process_text(sample_text)
|
||||
print("处理后:")
|
||||
print(processed)
|
||||
|
||||
processor.print_statistics()
|
||||
|
||||
print("\n使用说明:")
|
||||
print("命令行用法:")
|
||||
print(" python script.py -f input.txt # 处理文件")
|
||||
print(" python script.py -t '你的文本内容' # 直接处理文本")
|
||||
print(" python script.py -f input.txt -l 20 # 设置长度阈值为20")
|
||||
print(" python script.py -f input.txt -o output.txt # 输出到文件")
|
||||
print(" python script.py -f input.txt -p '。!?' -s # 自定义标点符号并显示统计")
|
||||
print(" python script.py test # 运行单元测试")
|
||||
|
||||
return processed
|
||||
|
||||
|
||||
|
||||
text = """盘龙江又冒出“神秘生物”啦!这次可不是娃娃鱼,网友都说:这届市民太有才咯!
|
||||
|
||||
01 跑步都能碰到“怪鱼”?昆明市民这操作简直笑死人!
|
||||
咱就说啊,最近昆明盘龙江里的“神秘生物”是不是有点太多啦?上个月万彩城河段才惊现粉色娃娃鱼,前几天又有市民在江边跑步的时候,突然瞅见水里游着一条浑身雪白的“怪鱼”,远远看去,老像国家二级保护动物娃娃鱼了。嘿,这位热心肠的市民啥也没说,直接就报了警,还特别贴心地把鱼捞上岸,装进塑料袋里,就好像生怕这鱼跑了似的。警察赶到的时候,现场都围了一圈人在那看热闹呢,有人拍照,有人录视频,不知道的还以为在江边搞啥“生物展览会”呢!
|
||||
|
||||
02 蝾螈假装娃娃鱼?森林公安说:这是家养的!
|
||||
民警一看这鱼,长得还真有点特别,赶紧联系森林公安来瞅瞅。结果这剧情反转得厉害啊——这压根就不是娃娃鱼,而是一条跟娃娃鱼长得很像的蝾螈!更逗的是,森林公安民警拎着塑料袋看了老半天,还补了一句:“这是家养的。”(这时候我都能想象到围观群众一脸懵的样子)
|
||||
|
||||
网友的神评论都刷爆屏了:
|
||||
|
||||
• “蝾螈:我就出来溜达溜达,咋就进局子了呢?”
|
||||
• “我建议把盘龙江改名叫‘神奇动物江’算了,下次会不会冒出尼斯湖水怪啊?”
|
||||
• “这届市民也太负责了,连家养的宠物都要报警上交!”
|
||||
03 前面有粉色娃娃鱼,后面有白色蝾螈!盘龙江成“网红打卡点”了?
|
||||
其实这已经是盘龙江今年第二次上热搜啦。4月份的时候,有阿姨在江里发现一条1.5米长、12公斤重的粉色娃娃鱼,当时还把专业救援队都给叫来了。这次虽然是个乌龙事儿,但网友都开始瞎想连续剧情节了:“下次是不是该轮到金色锦鲤啦?”
|
||||
|
||||
最逗的是评论区有人把自家鱼缸的照片都晒出来了,说:“警察叔叔,我家这条金龙鱼要不要也交上去啊?”(手动狗头)
|
||||
|
||||
04 警察叔叔重点提醒:这些动物可不能随便抓!
|
||||
虽说这次是虚惊一场,但民警还是一本正经地提醒大家:野生蝾螈和娃娃鱼可都是国家二级保护动物,自己私自去抓或者养,那可是可能要吃法律官司的。特别是现在有些短视频平台上,还有人把保护动物当宠物卖,起一些什么‘小恐龙’‘六角鱼’之类的花里胡哨的名字来忽悠人,大家可千万别上当!
|
||||
|
||||
05 吃瓜群众应对指南
|
||||
要是碰到不认识的动物该咋办呢?记住这个口诀就行:
|
||||
1️⃣ 别伸手去碰(万一这动物有毒或者带着病菌呢)
|
||||
2️⃣ 别给它投喂吃的(乱喂东西可能会把它们害死)
|
||||
3️⃣ 赶紧报警(专业的事儿就交给专业的人来办)
|
||||
|
||||
最后来个灵魂提问:**你觉得盘龙江下次会出现啥神奇生物?**欢迎在评论区尽情开脑洞!
|
||||
|
||||
(本文信息来源:昆明警方发布、都市条形码等官方通报)
|
||||
|
||||
谢谢大家看这篇文章哈,欢迎在评论区留下你的神吐槽!"""
|
||||
|
||||
|
||||
result = replace_text(text)
|
||||
print(result)
|
||||
@ -1,10 +1,13 @@
|
||||
from get_web_content import toutiao_extract_content
|
||||
from get_web_content import toutiao_w_extract_content
|
||||
|
||||
# 使用示例
|
||||
if __name__ == "__main__":
|
||||
url = "https://www.toutiao.com/article/7527481094266962473/"
|
||||
title, content, images = toutiao_w_extract_content(url)
|
||||
|
||||
title,article,imgs = toutiao_extract_content("https://www.toutiao.com/article/7491890368917602825/?log_from=ab01481cf63ba_1744526333347")
|
||||
|
||||
print("title:",title)
|
||||
|
||||
print("article",article)
|
||||
|
||||
print("imgs",imgs)
|
||||
print(f"标题: {title}")
|
||||
print(f"内容长度: {len(content)}")
|
||||
print(f"图片数量: {len(images)}")
|
||||
print("图片URLs:")
|
||||
for i, img_url in enumerate(images, 1):
|
||||
print(f"{i}. {img_url}")
|
||||
117
ArticleReplaceBatch/toutiao_source_enhanced.html
Normal file
117
ArticleReplaceBatch/toutiao_source_enhanced.html
Normal file
File diff suppressed because one or more lines are too long
390
ArticleReplaceBatch/txt2docx.py
Normal file
390
ArticleReplaceBatch/txt2docx.py
Normal file
@ -0,0 +1,390 @@
|
||||
import PySimpleGUI as sg
|
||||
import json
|
||||
|
||||
import os
|
||||
import random
|
||||
|
||||
from docx.shared import Pt, RGBColor
|
||||
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT, WD_UNDERLINE
|
||||
from docx.enum.text import WD_ALIGN_PARAGRAPH
|
||||
from docx.oxml import OxmlElement
|
||||
from docx.oxml.ns import qn
|
||||
from docx.enum.style import WD_STYLE_TYPE
|
||||
from docx import Document
|
||||
from docx.shared import Inches
|
||||
from PIL import Image
|
||||
|
||||
# 保存文件路径的 JSON 文件
|
||||
SETTINGS_FILE = 'settings.json'
|
||||
|
||||
|
||||
def set_picture_wrapping(paragraph):
|
||||
"""
|
||||
设置图片环绕方式
|
||||
:param paragraph:
|
||||
:return:
|
||||
"""
|
||||
# 设置图片环绕方式为上下环绕
|
||||
pPr = paragraph._element.get_or_add_pPr()
|
||||
framePr = OxmlElement('w:framePr')
|
||||
framePr.set(qn('w:wrap'), 'around')
|
||||
framePr.set(qn('w:vAnchor'), 'text')
|
||||
framePr.set(qn('w:hAnchor'), 'text')
|
||||
pPr.append(framePr)
|
||||
|
||||
|
||||
def format_word_document(input_filename, output_filename):
|
||||
# 打开文档
|
||||
doc = Document(input_filename)
|
||||
|
||||
# 创建或更新标题样式
|
||||
style = doc.styles.add_style('CustomHeading', WD_STYLE_TYPE.PARAGRAPH)
|
||||
style.font.name = '黑体'
|
||||
style.font.size = Pt(22) # 二号字
|
||||
style.font.color.rgb = RGBColor(0, 0, 255) # 蓝色
|
||||
style.paragraph_format.space_after = Pt(12) # 标题后间距
|
||||
# 创建或更新正文样式
|
||||
style = doc.styles.add_style('CustomBody', WD_STYLE_TYPE.PARAGRAPH)
|
||||
style.font.name = '仿宋'
|
||||
style.font.size = Pt(14) # 四号字
|
||||
style.paragraph_format.first_line_indent = Pt(20) # 首行缩进两字符
|
||||
style.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.LEFT
|
||||
style.paragraph_format.line_spacing = 1.5 # 行间距
|
||||
style.paragraph_format.space_before = Pt(6) # 段前间距
|
||||
style.paragraph_format.space_after = Pt(6) # 段后间距
|
||||
|
||||
# 遍历所有段落
|
||||
for paragraph in doc.paragraphs:
|
||||
# 设置标题格式
|
||||
if paragraph.style.name.startswith('Heading'):
|
||||
paragraph.style = doc.styles['CustomHeading']
|
||||
|
||||
# 设置段落格式
|
||||
else:
|
||||
paragraph.style = doc.styles['CustomBody']
|
||||
|
||||
# 遍历所有图片
|
||||
for rel in doc.part.rels.values():
|
||||
if "image" in rel.target_ref:
|
||||
# 获取图片所在的段落
|
||||
for paragraph in doc.paragraphs:
|
||||
for run in paragraph.runs:
|
||||
if run._element.tag.endswith('}pict'):
|
||||
# 设置图片居中
|
||||
paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
||||
# 设置图片环绕方式为上下环绕
|
||||
set_picture_wrapping(paragraph)
|
||||
paragraph.paragraph_format.space_before = Pt(12)
|
||||
paragraph.paragraph_format.space_after = Pt(12)
|
||||
|
||||
# output_filename = remove_book_titles(output_filename)
|
||||
|
||||
# 保存文档
|
||||
doc.save(output_filename)
|
||||
|
||||
|
||||
def crop_and_replace_images(folder_path):
|
||||
"""
|
||||
修改图片尺寸
|
||||
:param folder_path:
|
||||
:return:
|
||||
"""
|
||||
folder_path = folder_path.strip()
|
||||
# 遍历文件夹中的所有文件
|
||||
if not os.path.exists(folder_path):
|
||||
os.mkdir(folder_path)
|
||||
else:
|
||||
for filename in os.listdir(folder_path):
|
||||
if os.path.exists(filename):
|
||||
# 检查文件扩展名是否为图片格式
|
||||
if filename.lower().endswith(('.jpg','.png')):
|
||||
# 拼接完整的文件路径
|
||||
file_path = os.path.join(folder_path, filename)
|
||||
print("文件夹路径:" + folder_path)
|
||||
print("文件路径:" + file_path)
|
||||
# 打开图片
|
||||
with Image.open(file_path) as img:
|
||||
# 获取图片的尺寸
|
||||
width, height = img.size
|
||||
# 裁剪图片,裁剪下方10px
|
||||
cropped_img = img.crop((0, 0, width, height - (height * 0.2)))
|
||||
# 保存裁剪后的图片,覆盖原文件
|
||||
output_path = file_path[0:file_path.find('.')] + '.png'
|
||||
cropped_img.save(output_path, 'PNG')
|
||||
|
||||
|
||||
def split_text_into_paragraphs(text):
|
||||
"""
|
||||
将文本分割成段落,并在每个段落之间加一个空行
|
||||
:param text: 输入的文本
|
||||
:return: 段落列表
|
||||
"""
|
||||
paragraphs = text.split('\n\n')
|
||||
# 过滤掉空行和只包含空白字符的段落
|
||||
paragraphs = list(filter(lambda p: p.strip(), paragraphs))
|
||||
|
||||
# 在每个段落之间加一个空行
|
||||
paragraphs_with_blank_lines = []
|
||||
for paragraph in paragraphs:
|
||||
paragraphs_with_blank_lines.append(paragraph)
|
||||
paragraphs_with_blank_lines.append('')
|
||||
|
||||
# 移除最后一个多余的空行
|
||||
if paragraphs_with_blank_lines:
|
||||
paragraphs_with_blank_lines.pop()
|
||||
|
||||
return paragraphs_with_blank_lines
|
||||
|
||||
|
||||
def insert_images_into_paragraphs(paragraphs, image_folder, doc, title):
|
||||
"""
|
||||
将图片插入到段落中
|
||||
:param paragraphs:
|
||||
:param image_folder:
|
||||
:param doc:
|
||||
:return:
|
||||
"""
|
||||
|
||||
if os.path.exists(image_folder):
|
||||
images = sorted([os.path.join(image_folder, img) for img in os.listdir(image_folder) if
|
||||
img.lower().endswith(('jpg'))])
|
||||
else:
|
||||
images = []
|
||||
|
||||
# 获取图片列表并排序
|
||||
# images = sorted([os.path.join(image_folder, img) for img in os.listdir(image_folder) if
|
||||
# img.lower().endswith(('jpg'))])
|
||||
# images = sorted([os.path.join(image_folder, img) for img in os.listdir(image_folder) if
|
||||
# # img.lower().endswith(('png', 'jpg', 'jpeg'))])
|
||||
|
||||
total_images = len(images)
|
||||
|
||||
image_index = 0
|
||||
for i, paragraph in enumerate(paragraphs):
|
||||
|
||||
if "正文:" in paragraph:
|
||||
paragraph = paragraph.replace("正文:", '')
|
||||
p = doc.add_paragraph(paragraph)
|
||||
if os.path.exists(image_folder):
|
||||
# 插入图片
|
||||
if image_index < total_images:
|
||||
img_path = images[image_index]
|
||||
|
||||
# 确保图片路径正确且图片文件存在
|
||||
if os.path.exists(img_path):
|
||||
try:
|
||||
with Image.open(img_path) as img:
|
||||
width, height = img.size
|
||||
doc.add_picture(img_path, width=Inches(width / height * 1.5))
|
||||
image_index += 1
|
||||
except Exception as e:
|
||||
print(f"无法识别图像: {img_path}, 错误: {e}")
|
||||
continue
|
||||
else:
|
||||
print(f"图片路径无效: {img_path}")
|
||||
|
||||
|
||||
def create_word_document(text, image_folder, output_path, title):
|
||||
"""
|
||||
创建Word文档
|
||||
:param text:
|
||||
:param image_folder:
|
||||
:param output_path:
|
||||
:return:
|
||||
"""
|
||||
try:
|
||||
doc = Document()
|
||||
paragraphs = split_text_into_paragraphs(text)
|
||||
insert_images_into_paragraphs(paragraphs, image_folder, doc, title)
|
||||
# modify_document(doc)
|
||||
doc.save(output_path)
|
||||
try:
|
||||
format_word_document(output_path, output_path)
|
||||
except Exception as e:
|
||||
print(f"格式化文档 {output_path} 时出错: {e}")
|
||||
print(f'文档已保存到: {output_path}')
|
||||
except Exception as e:
|
||||
print(f"创建文档 {output_path} 时出错: {e}")
|
||||
|
||||
|
||||
# 读取指定路径下txt文本的内容
|
||||
def read_text_file(file_path):
|
||||
"""
|
||||
读取指定路径下txt文本的内容
|
||||
:param file_path:
|
||||
:return:
|
||||
"""
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as file:
|
||||
return file.read()
|
||||
except Exception as e:
|
||||
print(f"读取文件 {file_path} 时出错: {e}")
|
||||
return ""
|
||||
|
||||
|
||||
def get_file_name(file_path):
|
||||
"""
|
||||
获取文件名
|
||||
:param file_path:
|
||||
:return:
|
||||
"""
|
||||
return os.path.basename(file_path)
|
||||
|
||||
|
||||
def apply_random_style(paragraph):
|
||||
# 预定义字体颜色列表
|
||||
predefined_font_colors = [
|
||||
RGBColor(255, 0, 0), # 红色
|
||||
RGBColor(255, 165, 0), # 橙色
|
||||
RGBColor(128, 0, 128), # 紫色
|
||||
]
|
||||
|
||||
# 预定义背景颜色列表(手动定义RGB颜色,避免太亮或太深)
|
||||
predefined_bg_colors = [
|
||||
RGBColor(240, 240, 240), # 浅灰色
|
||||
RGBColor(255, 255, 224), # 浅黄色
|
||||
RGBColor(224, 255, 224), # 浅绿色
|
||||
RGBColor(224, 255, 255), # 浅青色
|
||||
RGBColor(255, 228, 225), # 浅粉色
|
||||
RGBColor(240, 248, 255), # 浅蓝色
|
||||
]
|
||||
|
||||
# 获取段落中的每一个run对象(代表一段连续的文字)
|
||||
for run in paragraph.runs:
|
||||
# 随机选择样式
|
||||
style_choice = random.choice(['bold', 'italic', 'underline', 'color', 'background'])
|
||||
|
||||
if style_choice == 'bold':
|
||||
run.bold = True
|
||||
elif style_choice == 'italic':
|
||||
run.italic = True
|
||||
elif style_choice == 'underline':
|
||||
run.underline = WD_UNDERLINE.SINGLE
|
||||
elif style_choice == 'color':
|
||||
# 从预定义颜色中随机选择一个颜色
|
||||
run.font.color.rgb = random.choice(predefined_font_colors)
|
||||
elif style_choice == 'background':
|
||||
# 从预定义背景颜色中随机选择一个颜色
|
||||
run.font.color.highlight_color = random.choice(predefined_bg_colors)
|
||||
|
||||
|
||||
def txt2docx(txt_path, image_path, keep_txt=True):
|
||||
file_path = txt_path
|
||||
try:
|
||||
txts = sorted([os.path.join(file_path, txt) for txt in os.listdir(file_path) if
|
||||
txt.lower().endswith(('txt'))])
|
||||
except Exception as e:
|
||||
print(f"读取文件夹 {file_path} 时出错: {e}")
|
||||
sg.popup_error(f"读取文件夹 {file_path} 时出错: {e}")
|
||||
return
|
||||
|
||||
img_path = image_path
|
||||
|
||||
for txt in txts:
|
||||
try:
|
||||
print("正在修改:" + txt)
|
||||
text = read_text_file(txt)
|
||||
if not text: # 如果读取失败,跳过此文件
|
||||
print(f"跳过文件: {txt} (读取失败)")
|
||||
continue
|
||||
|
||||
# print(text)
|
||||
txt_name = get_file_name(txt)
|
||||
title_name = txt_name.replace(".txt", "")
|
||||
title = title_name
|
||||
print(title)
|
||||
if "正文:" in text:
|
||||
new_text = text.split('正文:')[1].replace("```markdown", "").replace("```", "")
|
||||
else:
|
||||
new_text = text.replace("```markdown", "").replace("```", "")
|
||||
content = new_text
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
img_path = Path(img_path)
|
||||
image_folder = img_path / txt_name.replace(".txt", "").rstrip(".")
|
||||
|
||||
# crop_and_replace_images(image_folder)
|
||||
|
||||
create_word_document(content, image_folder, txt.replace(".txt", ".docx"), title_name)
|
||||
|
||||
# 根据用户选择决定是否删除原始txt文件
|
||||
if not keep_txt:
|
||||
try:
|
||||
os.remove(txt)
|
||||
print(f"已删除原始文件: {txt}")
|
||||
except Exception as e:
|
||||
print(f"删除文件 {txt} 时出错: {e}")
|
||||
else:
|
||||
print(f"保留原始文件: {txt}")
|
||||
except Exception as e:
|
||||
print(f"处理文件 {txt} 时出错: {e}")
|
||||
continue # 继续处理下一个文件
|
||||
|
||||
|
||||
# 加载设置
|
||||
def load_settings():
|
||||
if os.path.exists(SETTINGS_FILE):
|
||||
with open(SETTINGS_FILE, 'r') as f:
|
||||
return json.load(f)
|
||||
return {'folder1': '', 'folder2': ''}
|
||||
|
||||
|
||||
# 保存设置
|
||||
def save_settings(settings):
|
||||
with open(SETTINGS_FILE, 'w') as f:
|
||||
json.dump(settings, f)
|
||||
|
||||
|
||||
# 自定义函数,用于处理用户选择的文件夹
|
||||
def process_folders(folder1, folder2, keep_txt=True):
|
||||
# 检查文件夹是否存在
|
||||
if not os.path.exists(folder1):
|
||||
sg.popup_error(f"文章文件夹不存在: {folder1}")
|
||||
return
|
||||
if not os.path.exists(folder2):
|
||||
sg.popup_error(f"图片文件夹不存在: {folder2}")
|
||||
return
|
||||
|
||||
# 在这里添加处理文件夹的代码
|
||||
try:
|
||||
txt2docx(folder1, folder2, keep_txt)
|
||||
sg.popup("处理完成!")
|
||||
except Exception as e:
|
||||
sg.popup_error(f"处理过程中出错: {e}")
|
||||
|
||||
|
||||
# 加载之前的设置
|
||||
settings = load_settings()
|
||||
if 'keep_txt' not in settings:
|
||||
settings['keep_txt'] = True
|
||||
|
||||
# 定义窗口的布局
|
||||
layout = [
|
||||
[sg.Text('文章文件夹:'), sg.Input(default_text=settings['folder1']), sg.FolderBrowse()],
|
||||
[sg.Text('图片文件夹:'), sg.Input(default_text=settings['folder2']), sg.FolderBrowse()],
|
||||
[sg.Checkbox('保留原始txt文件', default=settings['keep_txt'], key='keep_txt')],
|
||||
[sg.Button('确认'), sg.Button('取消')]
|
||||
]
|
||||
|
||||
# 创建窗口
|
||||
window = sg.Window('文件夹选择窗口', layout)
|
||||
|
||||
# 事件循环
|
||||
while True:
|
||||
event, values = window.read()
|
||||
if event == sg.WIN_CLOSED or event == '取消': # 如果用户关闭窗口或点击取消按钮
|
||||
break
|
||||
elif event == '确认': # 如果用户点击确认按钮
|
||||
folder1 = values[0]
|
||||
folder2 = values[1]
|
||||
keep_txt = values['keep_txt']
|
||||
process_folders(folder1, folder2, keep_txt)
|
||||
# 保存用户选择的文件夹路径和保留txt文件的选项
|
||||
settings['folder1'] = folder1
|
||||
settings['folder2'] = folder2
|
||||
settings['keep_txt'] = keep_txt
|
||||
save_settings(settings)
|
||||
|
||||
# 关闭窗口
|
||||
window.close()
|
||||
@ -89,13 +89,62 @@ def extract_content_until_punctuation(text, punctuations=r'[,。!?;]'):
|
||||
|
||||
|
||||
|
||||
# 读取Excel表格某一列的内容并将内容以列表的形式返回
|
||||
# 读取Excel表格链接列和类型列的内容并将内容以元组列表的形式返回
|
||||
def read_excel(file_name):
|
||||
datas = pd.read_excel(file_name)
|
||||
first_column_name = datas.columns[0]
|
||||
first_colunm_data = datas[first_column_name].tolist()
|
||||
print(first_colunm_data)
|
||||
|
||||
return first_colunm_data
|
||||
first_column_name = datas.columns[0] # 链接列
|
||||
type_column_name = '类型' # 类型列
|
||||
|
||||
links = datas[first_column_name].tolist()
|
||||
# 如果存在类型列就读取,不存在则为默认类型
|
||||
types = datas[type_column_name].tolist() if type_column_name in datas.columns else ['默认'] * len(links)
|
||||
|
||||
# 将链接和类型组合成元组列表
|
||||
result = list(zip(links, types))
|
||||
print(result)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
|
||||
|
||||
from typing import Tuple
|
||||
|
||||
|
||||
def handle_duplicate_files_advanced(folder_path: str, filename: str) -> Tuple[str, bool]:
|
||||
"""
|
||||
增强版:处理文件夹中的同名文件,支持更复杂的场景
|
||||
|
||||
参数:
|
||||
folder_path: 文件夹路径
|
||||
filename: 原始文件名
|
||||
|
||||
返回:
|
||||
Tuple[str, bool]: (处理后的文件名, 是否是重命名的)
|
||||
"""
|
||||
base, ext = os.path.splitext(filename)
|
||||
target_path = os.path.join(folder_path, filename)
|
||||
|
||||
if not os.path.exists(target_path):
|
||||
return filename, False
|
||||
|
||||
existing_files = set(os.listdir(folder_path))
|
||||
pattern = re.compile(r'^{}(_(\d+))?{}$'.format(re.escape(base), re.escape(ext)))
|
||||
|
||||
# 找出所有匹配的文件并提取数字
|
||||
numbers = []
|
||||
for f in existing_files:
|
||||
match = pattern.match(f)
|
||||
if match:
|
||||
num = int(match.group(2)) if match.group(2) else 0
|
||||
numbers.append(num)
|
||||
|
||||
next_num = max(numbers) + 1 if numbers else 1
|
||||
new_filename = f"{base}_{next_num}{ext}"
|
||||
|
||||
# 确保新文件名也不存在(处理并发情况)
|
||||
while new_filename in existing_files:
|
||||
next_num += 1
|
||||
new_filename = f"{base}_{next_num}{ext}"
|
||||
|
||||
return new_filename, True
|
||||
8
text translation/.idea/.gitignore
vendored
Normal file
8
text translation/.idea/.gitignore
vendored
Normal file
@ -0,0 +1,8 @@
|
||||
# Default ignored files
|
||||
/shelf/
|
||||
/workspace.xml
|
||||
# Editor-based HTTP Client requests
|
||||
/httpRequests/
|
||||
# Datasource local storage ignored files
|
||||
/dataSources/
|
||||
/dataSources.local.xml
|
||||
Loading…
Reference in New Issue
Block a user