Compare commits

..

No commits in common. "3b305f1d7263121bdf0c1f832b39ede550d70caa" and "bd0c6a6ff077c63940570164abf2e4c549b470e3" have entirely different histories.

17 changed files with 226 additions and 3114 deletions

File diff suppressed because it is too large Load Diff

View File

@ -13,7 +13,6 @@ def call_dify_workflow(input_data):
:param input_data: 传递给工作流的输入数据
:return: 工作流的输出结果
"""
logger.info("Dify开始工作。。。")
api_key = CONFIG['Dify']['api_key']
user_id = CONFIG['Dify']['user_id']
url = CONFIG['Dify']['url']
@ -36,138 +35,3 @@ def call_dify_workflow(input_data):
# print("article:", article)
return article
# ==========================调用coze工作流==========================
def call_coze_workflow(parameters):
"""
调用 Coze 工作流的函数
:param parameters: 传递给工作流的输入参数字典格式
:return: 工作流的执行结果
"""
logger.info("Coze开始工作。。。。")
workflow_id = CONFIG['Coze']['workflow_id']
access_token = CONFIG['Coze']['access_token']
is_async = CONFIG['Coze']['is_async'].lower() == 'true'
url = "https://api.coze.cn/v1/workflow/run"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
data = {
"workflow_id": workflow_id,
"parameters": parameters,
"is_async": is_async
}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
# data = json.loads(response.text)['data']
# print("data",data['output'])
return response.text
else:
return {
"error": f"请求失败,状态码:{response.status_code}",
"detail": response.text
}
def call_coze_article_workflow(parameters):
"""
调用 Coze 工作流的函数
:param parameters: 传递给工作流的输入参数字典格式
:param is_async: 是否异步执行默认 False
:return: 工作流的执行结果
"""
workflow_id = CONFIG['Coze']['workflow_id']
access_token = CONFIG['Coze']['access_token']
is_async = CONFIG['Coze']['is_async'].lower() == 'true'
url = "https://api.coze.cn/v1/workflow/run"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
data = {
"workflow_id": workflow_id,
"parameters": parameters,
"is_async": is_async
}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
# data = json.loads(response.text)['data']
# print("data",data['output'])
import ast
# 直接解析整个result字符串
result_dict = ast.literal_eval(response.text)
# 解析data字段
data_dict = ast.literal_eval(result_dict['data'])
# 获取output的值
output_value = data_dict['output']
return output_value
else:
return {
"error": f"请求失败,状态码:{response.status_code}",
"detail": response.text
}
def call_coze_all_article_workflow(parameters,is_async=False):
"""
调用 Coze 工作流的函数
:param parameters: 传递给工作流的输入参数字典格式
:param is_async: 是否异步执行默认 False
:return: 工作流的执行结果
"""
workflow_id = CONFIG['Coze']['workflow_id']
access_token = CONFIG['Coze']['access_token']
is_async = CONFIG['Coze']['is_async'].lower() == 'False'
url = "https://api.coze.cn/v1/workflow/run"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
data = {
"workflow_id": workflow_id,
"parameters": parameters,
"is_async": is_async
}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
# data = json.loads(response.text)['data']
# print("data",data['output'])
import ast
# 直接解析整个result字符串
result_dict = ast.literal_eval(response.text)
print(result_dict)
# 解析data字段
data_dict = ast.literal_eval(result_dict['data'])
# 获取output的值
title = data_dict['title']
article = data_dict['article']
return title, article
else:
return {
"error": f"请求失败,状态码:{response.status_code}",
"detail": response.text
}

View File

@ -1,12 +0,0 @@
你绝对想不到江苏王女士最近收到电费单时惊了夏天每月电费突然涨到800元。她翻出家里所有电器连路由器都拔了结果第二个月电费反而涨到900块
据《现代快报》报道供电局工作人员上门检查后才发现罪魁祸首是待机状态的空调。王女士家3台空调插头都没拔每月能白白耗掉200多度电。这事让不少网友直呼"活久见",有人留言:"我家电视常年插着电源,难怪电费总降不下来!"
其实国家电网早做过测试普通家电待机功率在13瓦之间。按每天待机20小时算光机顶盒一年就能吃掉30度电。更扎心的是很多家庭至少有5台电器长期插着电一年下来相当于白交三百块
我特意翻出家里老电表,发现拔掉所有插头后,电表真的转得慢了。现在我家冰箱外的电器用完就拔,这个月省了五十多电费。你家电表跑得快吗?赶紧试试拔插头吧!
生活窍门 家庭用电 省电妙招 居家过日子
你家最近电费有变化吗?评论区聊聊你的省电妙招吧!

View File

@ -1,11 +0,0 @@
上海垃圾分类新规实施半个月罚款总额突破200万据东方网报道光是黄浦区就开出了2.3万张罚单平均每分钟都有居民被处罚。我家楼下王阿姨前天刚被罚了50块就因为在垃圾站门口多站了半分钟。
可你绝对想不到全市60%的罚款都集中在3个高档小区。这些小区明明配置了智能分类设备还有专人指导结果反而成了"重灾区"。隔壁张叔气得直拍大腿:"我天天在家分拣半小时,最后还因为垃圾袋颜色不对被罚!"
据环保局数据显示新规实施后厨余垃圾分拣正确率反而下降了5%。这事真不能全怪老百姓,有些小区督导员自己都搞不清分类标准。我亲眼见过督导员把干电池扔进有害垃圾箱,那可是要扣分的啊!
不过话说回来,垃圾分类确实是利国利民的好事。关键是不能"一刀切",得给大伙儿适应时间。听说杭州试点"三次提醒再罚款"的模式,效果反而更好。这事您怎么看?您家小区垃圾分类顺利吗?
垃圾分类新规 罚款争议 上海热点 社区管理 民生政策

View File

@ -15,14 +15,6 @@ DEFAULT_CONFIG = {
"title_file": "文章链接.xlsx",
"max_threads": "3"
},
"Coze": {
"workflow_id": "",
"access_token": "",
"is_async": "false",
"input_data_template": "{\"article\": \"{article_text}\", \"link\":\"{link}\", \"weijin\":\"{weijin}\"}",
"last_used_template": "",
"last_used_template_type": "文章"
},
"Database": {
"host": "27.106.125.150",
"user": "root",

View File

@ -10,8 +10,6 @@ from config import *
from utils import safe_open_directory
IMGS_BASE_PATH = CONFIG['General']['images_path']
def crop_and_replace_images(folder_path):
"""
修改图片尺寸
@ -83,25 +81,15 @@ def download_image(image_url, save_path):
print(f"请求出错:{e}")
def download_and_process_images(img_urls, article_title, save_dir=None):
def download_and_process_images(img_urls, article_title):
"""
下载并处理图片
:param img_urls: 图片URL列表
:param article_title: 文章标题
:param save_dir: 自定义保存目录如果为None则使用默认目录
"""
if save_dir is None:
save_dir = IMGS_BASE_PATH
img_dir_path = os.path.join(str(save_dir), str(article_title))
logger.info(f"图片保存路径:{img_dir_path}")
img_dir_path = os.path.join(IMGS_BASE_PATH, article_title)
safe_open_directory(img_dir_path)
for i, img_url in enumerate(img_urls):
if img_url.startswith("https"):
imgurl = img_url
else:
imgurl = "https:"+img_url
imgurl = "https:" + img_url
img_path = os.path.join(img_dir_path, f"图片{i}.jpg")
try:
download_image(imgurl, img_path)

View File

@ -0,0 +1,209 @@
import pandas as pd
import getpass
import sys # 导入sys模块
import threading
import queue
from ai_studio import call_dify_workflow
from databases import *
from images_edit import download_and_process_images
from utils import *
from get_web_content import *
from config import *
# ==============================主程序===========================
def process_link(link):
"""
处理单个链接
"""
try:
title_text, article_text, img_urls = "","",[]
if str(link).startswith("https://www.toutiao.com/w"):
title_text, article_text, img_urls = toutiao_w_extract_content(link)
elif str(link).startswith("https://www.toutiao.com/article/"):
title_text, article_text, img_urls = toutiao_extract_content(link)
else:
title_text, article_text, img_urls = "", "", []
# 获取数据库配置
host = CONFIG['Database']['host']
user = CONFIG['Database']['user']
password = CONFIG['Database']['password']
database = CONFIG['Database']['database']
# 判断文章内容是否有违禁词
check_keywords = check_keywords_in_text(title_text)
if check_keywords:
print("文章中有违禁词!")
check_link_insert(host, user, password, database, link)
return
title = extract_content_until_punctuation(article_text).replace("正文:", "")
print(title)
print(article_text)
from datetime import datetime
# 获取当前时间并格式化
current_time = datetime.now().strftime("%H:%M:%S")
# 打印当前时间
print("当前时间:", current_time)
input_data = {
"old_article": article_text
}
message_content = call_dify_workflow(input_data)
# 获取当前时间并格式化
current_time = datetime.now().strftime("%H:%M:%S")
# 打印当前时间
print("当前时间:", current_time)
finally_article = message_content.replace("正文:", "") + "\n"
article_save_path = os.path.join(ARTICLES_BASE_PATH, f"{title}.txt")
if '*' in finally_article or '#' in finally_article or "-" in finally_article:
# 使用正则表达式一次性替换多个字符
old_content = re.sub(r'[*#-]', '', message_content)
else:
# 如果不需要替换,直接使用原内容
old_content = finally_article
print("改写完成的文章:" + old_content)
# 删除AI词汇
content = old_content
check_link_insert(host, user, password, database, link)
# 判断文章合规度
if text_detection(content) == "合规":
print("文章合规")
pass
else:
print("文章不合规")
return
with open(article_save_path, 'w', encoding='utf-8') as f:
f.write(content)
logging.info('文本已经保存')
if img_urls:
download_and_process_images(img_urls, title)
except Exception as e:
logging.error(f"处理链接 {link} 时出错: {e}")
raise
def link_to_text(prompt1=None, prompt2=None, num_threads=None):
use_link_path = 'use_link_path.txt'
# 读取链接
links = read_excel(TITLE_BASE_PATH)
# 过滤已处理的链接
filtered_links = []
host = CONFIG['Database']['host']
user = CONFIG['Database']['user']
password = CONFIG['Database']['password']
database = CONFIG['Database']['database']
for link in links:
logging.info(f"总共{len(links)}个链接")
if check_link_exists(host, user, password, database, link):
logger.info(f"链接已存在: {link}")
continue
else:
filtered_links.append(link)
logger.info(f"链接不存在: {link}")
print("链接不存在,存储到过滤器中:",link)
if not filtered_links:
logger.info("没有新链接需要处理")
return []
# 使用多线程处理链接
results = process_links_with_threads(filtered_links, num_threads)
# 记录已处理的链接
with open(use_link_path, 'a+', encoding='utf-8') as f:
for link, success, _ in results:
if success:
f.write(link + "\n")
return results
# 创建一个任务队列和结果队列
task_queue = queue.Queue()
result_queue = queue.Queue()
# 工作线程函数
def worker():
while True:
try:
# 从队列中获取任务
link = task_queue.get()
if link is None: # 结束信号
break
# 处理链接
try:
process_link(link)
result_queue.put((link, True, None)) # 成功
except Exception as e:
result_queue.put((link, False, str(e))) # 失败
logger.error(f"处理链接 {link} 时出错: {e}")
# 标记任务完成
task_queue.task_done()
except Exception as e:
logger.error(f"工作线程出错: {e}")
# 多线程处理链接
def process_links_with_threads(links, num_threads=None):
if num_threads is None:
num_threads = min(MAX_THREADS, len(links))
else:
num_threads = min(num_threads, MAX_THREADS, len(links))
# 创建工作线程
threads = []
for _ in range(num_threads):
t = threading.Thread(target=worker)
t.daemon = True
t.start()
threads.append(t)
# 添加任务到队列
for link in links:
task_queue.put(link)
# 添加结束信号
for _ in range(num_threads):
task_queue.put(None)
# 等待所有线程完成
for t in threads:
t.join()
# 处理结果
results = []
while not result_queue.empty():
results.append(result_queue.get())
return results

View File

@ -1,299 +0,0 @@
import threading
import queue
import json # 导入 json 模块
from ai_studio import call_dify_workflow,call_coze_article_workflow,call_coze_all_article_workflow
from databases import *
from images_edit import download_and_process_images
from utils import *
from get_web_content import *
from config import *
# ==============================主程序===========================
def process_link(link_info, ai_service, current_template=None,generation_type=None):
link, article_type = link_info # 解包链接和类型信息
"""
处理单个链接
:param link: 要处理的链接
:param ai_service: AI服务提供商可选值dify, coze
:param current_template: 当前选择的模板配置
"""
try:
if link.startswith("https://www.toutiao.com"):
title_text, article_text, img_urls = toutiao_w_extract_content(link)
if title_text == "":
title_text, article_text, img_urls = toutiao_extract_content(link)
elif link.startswith("https://mp.weixin.qq.co"):
title_text, article_text, img_urls = wechat_extract_content(link)
elif link.startswith("https://www.163.com"):
title_text, article_text, img_urls = wangyi_extract_content(link)
else:
title_text, article_text, img_urls = "", "", []
if title_text == "":
return
elif len(title_text) > 100:
return
# 获取数据库配置
host = CONFIG['Database']['host']
user = CONFIG['Database']['user']
password = CONFIG['Database']['password']
database = CONFIG['Database']['database']
# 判断文章内容是否有违禁词
check_keywords = check_keywords_in_text(title_text)
title = extract_content_until_punctuation(article_text).replace("正文:", "")
from datetime import datetime
# 获取当前时间并格式化
current_time = datetime.now().strftime("%H:%M:%S")
# 打印当前时间
print("当前时间:", current_time)
if ai_service == "dify":
if check_keywords:
print("文章中有违禁词!")
check_link_insert(host, user, password, database, link)
return
# 从配置加载 input_data 模板
input_data_template_str = CONFIG['Dify'].get('input_data_template', '{{"old_article": "{article_text}"}}')
try:
# 解析模板字符串为字典
input_data_template = json.loads(input_data_template_str)
# 使用实际变量格式化模板
input_data = {k: v.format(article_text=article_text) for k, v in input_data_template.items()}
except (json.JSONDecodeError, KeyError, AttributeError) as e:
logger.error(f"处理 Dify input_data 模板时出错: {e}. 使用默认模板.")
input_data = {
"old_article": article_text
}
# input_data = {
# "old_article": article_text
# }
message_content = call_dify_workflow(input_data)
elif ai_service == "coze":
logger.info("coze正在处理")
logger.info(f"正在处理的文章类型为:{generation_type}")
# 如果有模板配置临时更新CONFIG
original_config = None
if current_template:
original_config = {
'workflow_id': CONFIG['Coze']['workflow_id'],
'access_token': CONFIG['Coze']['access_token'],
'is_async': CONFIG['Coze']['is_async'],
'input_data_template': CONFIG['Coze'].get('input_data_template', '')
}
CONFIG['Coze']['workflow_id'] = current_template.get('workflow_id', '')
CONFIG['Coze']['access_token'] = current_template.get('access_token', '')
CONFIG['Coze']['is_async'] = current_template.get('is_async', 'true')
CONFIG['Coze']['input_data_template'] = current_template.get('input_data_template', '')
logger.info(f"应用模板配置: {current_template.get('name')}")
logger.info(f"Workflow ID: {CONFIG['Coze']['workflow_id']}")
logger.info(f"Access Token: {'*' * len(CONFIG['Coze']['access_token'])}")
logger.info(f"Is Async: {CONFIG['Coze']['is_async']}")
logger.info(f"Input Template: {CONFIG['Coze']['input_data_template']}")
try:
# 从配置加载 Coze input_data 模板
input_data_template_str = CONFIG['Coze'].get('input_data_template')
# 解析模板字符串为字典
input_data_template = json.loads(input_data_template_str)
# 使用实际变量格式化模板
title = ""
if generation_type == "短篇":
input_data = {
"article": article_text
}
print("coze中输入",input_data)
message_content = call_coze_article_workflow(input_data)
elif generation_type == "文章":
print("原文中标题为:",title_text)
print("原文中内容为:",article_text)
input_data = {
"title":title_text,
"article": article_text
}
print("发送的请求数据为:",input_data)
title, message_content = call_coze_all_article_workflow(input_data)
finally:
# 恢复原始配置(如果有的话)
if original_config is not None:
CONFIG['Coze']['workflow_id'] = original_config['workflow_id']
CONFIG['Coze']['access_token'] = original_config['access_token']
CONFIG['Coze']['is_async'] = original_config['is_async']
CONFIG['Coze']['input_data_template'] = original_config['input_data_template']
# 获取当前时间并格式化
current_time = datetime.now().strftime("%H:%M:%S")
print("原文章", article_text)
print("========================")
print("改写后的文章",message_content)
# 打印当前时间
print("当前时间:", current_time)
file_name = ""
if generation_type == '短篇':
file_name = handle_duplicate_files_advanced(ARTICLES_BASE_PATH,title_text)[0]
elif generation_type == "文章":
file_name = handle_duplicate_files_advanced(ARTICLES_BASE_PATH,title)[0]
# 创建类型目录
type_dir = os.path.join(ARTICLES_BASE_PATH, article_type)
safe_open_directory(type_dir)
# 在类型目录下保存文章
article_save_path = os.path.join(type_dir, f"{file_name}.txt")
# 判断文章合规度
if text_detection(message_content) == "合规":
print("文章合规")
pass
else:
print("文章不合规")
return
with open(article_save_path, 'w', encoding='utf-8') as f:
f.write(message_content)
logging.info('文本已经保存')
if img_urls:
# 在类型目录下创建图片目录
type_picture_dir = os.path.join(IMGS_BASE_PATH, article_type)
safe_open_directory(type_picture_dir)
download_and_process_images(img_urls, file_name, type_picture_dir)
except Exception as e:
logging.error(f"处理链接 {link} 时出错: {e}")
raise
def link_to_text(num_threads=None, ai_service="dify", current_template=None, generation_type=None):
use_link_path = 'use_link_path.txt'
# 读取链接
links = read_excel(TITLE_BASE_PATH)
# 过滤已处理的链接
filtered_links = []
host = CONFIG['Database']['host']
user = CONFIG['Database']['user']
password = CONFIG['Database']['password']
database = CONFIG['Database']['database']
for link_info in links:
link = link_info[0].strip() # 获取链接并去除空白字符
# 如果Excel中有类型使用Excel中的类型否则使用传入的generation_type
article_type = link_info[1].strip() if len(link_info) > 1 and link_info[1].strip() else generation_type
logging.info(f"总共{len(links)}个链接")
# if check_link_exists(host, user, password, database, link):
# logger.info(f"链接已存在: {link}")
# continue
# else:
filtered_links.append((link, article_type)) # 保存链接和类型的元组
# logger.info(f"链接不存在: {link}")
# print("链接不存在,存储到过滤器中:", link)
if not filtered_links:
logger.info("没有新链接需要处理")
return []
# 使用多线程处理链接
results = process_links_with_threads(filtered_links, num_threads, ai_service, current_template,generation_type)
# 记录已处理的链接
with open(use_link_path, 'a+', encoding='utf-8') as f:
for link, success, _ in results:
if success:
f.write(link + "\n")
return results
# 创建一个任务队列和结果队列
task_queue = queue.Queue()
result_queue = queue.Queue()
# 工作线程函数
def worker(ai_service, current_template=None,generation_type=None):
while True:
try:
# 从队列中获取任务
link = task_queue.get()
if link is None: # 结束信号
break
# 处理链接
try:
logger.info(f"开始处理链接:{link}")
process_link(link, ai_service, current_template,generation_type)
result_queue.put((link, True, None)) # 成功
except Exception as e:
result_queue.put((link, False, str(e))) # 失败
logger.error(f"处理链接 {link} 时出错: {e}")
# 标记任务完成
task_queue.task_done()
except Exception as e:
logger.error(f"工作线程出错: {e}")
# 多线程处理链接
def process_links_with_threads(links, num_threads=None, ai_service="dify", current_template=None,generation_type=None):
if num_threads is None:
num_threads = min(MAX_THREADS, len(links))
else:
num_threads = min(num_threads, MAX_THREADS, len(links))
# 清空任务队列和结果队列
while not task_queue.empty():
task_queue.get()
while not result_queue.empty():
result_queue.get()
# 创建工作线程
threads = []
# 将AI服务选择和模板配置传递给worker函数
for _ in range(num_threads):
t = threading.Thread(target=worker, args=(ai_service, current_template,generation_type))
t.daemon = True
t.start()
threads.append(t)
# 添加任务到队列
for link in links:
task_queue.put(link)
# 添加结束信号
for _ in range(num_threads):
task_queue.put(None)
# 等待所有线程完成
for t in threads:
t.join()
# 处理结果
results = []
while not result_queue.empty():
results.append(result_queue.get())
return results

View File

@ -1,25 +0,0 @@
{
"architectures": [
"BertForMaskedLM"
],
"attention_probs_dropout_prob": 0.1,
"directionality": "bidi",
"hidden_act": "gelu",
"hidden_dropout_prob": 0.1,
"hidden_size": 768,
"initializer_range": 0.02,
"intermediate_size": 3072,
"layer_norm_eps": 1e-12,
"max_position_embeddings": 512,
"model_type": "bert",
"num_attention_heads": 12,
"num_hidden_layers": 12,
"pad_token_id": 0,
"pooler_fc_size": 768,
"pooler_num_attention_heads": 12,
"pooler_num_fc_layers": 3,
"pooler_size_per_head": 128,
"pooler_type": "first_token_transform",
"type_vocab_size": 2,
"vocab_size": 21128
}

View File

@ -1,464 +0,0 @@
import re
import random
import argparse
import sys
import os
from typing import List, Tuple, Optional, Dict, Any
from pathlib import Path
import logging
class TextProcessor:
"""文本处理器类,支持句子拆分和字符交换"""
def __init__(self, min_length: int = 30, custom_punctuation: Optional[str] = None):
"""
初始化文本处理器
Args:
min_length: 句子长度阈值
custom_punctuation: 自定义标点符号如果为None则使用默认标点
"""
self.min_length = min_length
self.sentence_endings = custom_punctuation or r'[?!;]'
self.statistics = {
'total_sentences': 0,
'processed_sentences': 0,
'total_chars': 0,
'swapped_chars': 0
}
# 设置日志
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
self.logger = logging.getLogger(__name__)
def split_sentences(self, text: str) -> List[Tuple[str, str]]:
"""
按标点符号拆分句子保留标点符号
Args:
text: 输入文本
Returns:
List[Tuple[str, str]]: 每个元组包含 (句子内容, 标点符号)
"""
if not text.strip():
return []
# 使用正则表达式拆分,保留分隔符
parts = re.split(f'({self.sentence_endings})', text)
sentences = []
i = 0
while i < len(parts):
content = parts[i].strip()
if content: # 非空内容
# 检查下一个部分是否是标点符号
if i + 1 < len(parts) and re.match(self.sentence_endings, parts[i + 1]):
punctuation = parts[i + 1]
i += 2
else:
punctuation = ''
i += 1
sentences.append((content, punctuation))
self.statistics['total_sentences'] += 1
else:
i += 1
return sentences
def swap_random_chars(self, sentence: str) -> str:
"""
对超长句子随机交换相邻两个字符的顺序
Args:
sentence: 输入句子
Returns:
str: 处理后的句子
"""
# 边界情况处理
if not sentence or len(sentence) <= self.min_length or len(sentence) <= 3:
return sentence
# 转换为字符列表便于操作
chars = list(sentence)
original_length = len(chars)
# 确定可交换的范围(避开首尾字符,且需要成对相邻)
# 对于长度为n的句子可交换的相邻对位置为(1,2), (2,3), ..., (n-3,n-2)
start_idx = 1
end_idx = len(chars) - 3 # 最后一个可交换对的起始位置
if end_idx < start_idx:
return sentence
try:
# 随机选择一个相邻对的起始位置
swap_start = random.randint(start_idx, end_idx)
swap_end = swap_start + 1
# 交换相邻的两个字符
chars[swap_start], chars[swap_end] = chars[swap_end], chars[swap_start]
# 更新统计信息
self.statistics['processed_sentences'] += 1
self.statistics['swapped_chars'] += 2
self.logger.debug(f"交换相邻位置 {swap_start}{swap_end},句子长度:{original_length}")
except (ValueError, IndexError) as e:
self.logger.warning(f"字符交换失败:{e}")
return sentence
return ''.join(chars)
def process_text(self, text: str) -> str:
"""
处理文本拆分句子并对超长句子进行字符交换
Args:
text: 输入文本
Returns:
str: 处理后的文本
"""
if not text:
return text
# 重置统计信息
self.statistics = {
'total_sentences': 0,
'processed_sentences': 0,
'total_chars': len(text),
'swapped_chars': 0
}
# 按段落分割
paragraphs = text.split('\n')
processed_paragraphs = []
for paragraph in paragraphs:
if not paragraph.strip():
processed_paragraphs.append(paragraph)
continue
# 拆分句子
sentences = self.split_sentences(paragraph)
# 处理每个句子
processed_sentences = []
for sentence_content, punctuation in sentences:
# 对句子内容进行字符交换
processed_content = self.swap_random_chars(sentence_content)
processed_sentences.append(processed_content + punctuation)
# 重新组合句子
processed_paragraph = ''.join(processed_sentences)
processed_paragraphs.append(processed_paragraph)
return '\n'.join(processed_paragraphs)
def get_statistics(self) -> Dict[str, Any]:
"""获取处理统计信息"""
return self.statistics.copy()
def print_statistics(self):
"""打印处理统计信息"""
stats = self.get_statistics()
print("\n" + "=" * 50)
print("处理统计信息:")
print(f"总字符数:{stats['total_chars']}")
print(f"总句子数:{stats['total_sentences']}")
print(f"处理句子数:{stats['processed_sentences']}")
print(f"交换字符数:{stats['swapped_chars']}")
if stats['total_sentences'] > 0:
print(f"处理率:{stats['processed_sentences'] / stats['total_sentences'] * 100:.1f}%")
print("=" * 50)
class FileHandler:
"""文件处理器,负责文件的读写操作"""
@staticmethod
def read_file(filename: str) -> str:
"""
读取文件内容支持多种编码
Args:
filename: 文件路径
Returns:
str: 文件内容
Raises:
FileNotFoundError: 文件不存在
PermissionError: 权限不足
UnicodeDecodeError: 编码错误
"""
if not os.path.exists(filename):
raise FileNotFoundError(f"文件 '{filename}' 不存在")
if not os.access(filename, os.R_OK):
raise PermissionError(f"没有读取文件 '{filename}' 的权限")
# 尝试多种编码格式
encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1']
for encoding in encodings:
try:
with open(filename, 'r', encoding=encoding) as f:
content = f.read()
logging.info(f"使用 {encoding} 编码成功读取文件:{filename}")
return content
except UnicodeDecodeError:
continue
raise UnicodeDecodeError(f"无法解码文件 '{filename}',尝试的编码格式:{encodings}")
@staticmethod
def write_file(filename: str, content: str, encoding: str = 'utf-8') -> None:
"""
写入文件内容
Args:
filename: 输出文件路径
content: 要写入的内容
encoding: 编码格式
Raises:
PermissionError: 权限不足
OSError: 磁盘空间不足等系统错误
"""
# 确保目录存在
output_dir = os.path.dirname(filename)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
try:
with open(filename, 'w', encoding=encoding) as f:
f.write(content)
logging.info(f"成功写入文件:{filename}")
except PermissionError:
raise PermissionError(f"没有写入文件 '{filename}' 的权限")
except OSError as e:
raise OSError(f"写入文件 '{filename}' 时发生错误:{e}")
def setup_argument_parser() -> argparse.ArgumentParser:
"""设置命令行参数解析器"""
parser = argparse.ArgumentParser(
description='文本句子字符交换处理器',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
使用示例
%(prog)s -f input.txt # 处理文件
%(prog)s -t "你的文本内容" # 直接处理文本
%(prog)s -f input.txt -l 20 # 设置长度阈值为20
%(prog)s -f input.txt -o output.txt # 输出到文件
%(prog)s -f input.txt -p "。!?" -s # 自定义标点符号并显示统计
"""
)
# 输入选项
input_group = parser.add_mutually_exclusive_group(required=True)
input_group.add_argument('-f', '--file', help='输入文件路径')
input_group.add_argument('-t', '--text', help='直接输入文本')
input_group.add_argument('--stdin', action='store_true',
help='从标准输入读取文本')
# 处理选项
parser.add_argument('-l', '--length', type=int, default=30,
help='句子长度阈值默认30')
parser.add_argument('-p', '--punctuation',
help='自定义标点符号(默认:。!?;?!;')
parser.add_argument('-o', '--output', help='输出文件路径')
parser.add_argument('-e', '--encoding', default='utf-8',
help='输出文件编码默认utf-8')
# 其他选项
parser.add_argument('-s', '--statistics', action='store_true',
help='显示处理统计信息')
parser.add_argument('-v', '--verbose', action='store_true',
help='显示详细日志')
parser.add_argument('--seed', type=int, help='随机数种子(用于测试)')
return parser
def main():
"""主函数:处理命令行参数和文本处理"""
parser = setup_argument_parser()
args = parser.parse_args()
# 设置日志级别
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# 设置随机数种子(用于测试)
if args.seed:
random.seed(args.seed)
# 获取输入文本
try:
if args.file:
text = FileHandler.read_file(args.file)
elif args.text:
text = args.text
elif args.stdin:
text = sys.stdin.read()
else:
print("错误:请指定输入源")
sys.exit(1)
if not text.strip():
print("警告:输入文本为空")
sys.exit(0)
except (FileNotFoundError, PermissionError, UnicodeDecodeError) as e:
print(f"错误:{e}")
sys.exit(1)
# 创建处理器并处理文本
try:
processor = TextProcessor(
min_length=args.length,
custom_punctuation=args.punctuation
)
processed_text = processor.process_text(text)
# 输出结果
if args.output:
FileHandler.write_file(args.output, processed_text, args.encoding)
print(f"处理完成,结果已保存到 '{args.output}'")
else:
print("处理结果:")
print("-" * 50)
print(processed_text)
# 显示统计信息
if args.statistics:
processor.print_statistics()
except Exception as e:
print(f"处理过程中发生错误:{e}")
if args.verbose:
import traceback
traceback.print_exc()
sys.exit(1)
# 单元测试
def run_tests():
"""运行基本的单元测试"""
print("运行单元测试...")
# 测试句子拆分
processor = TextProcessor(min_length=6)
# 测试1普通句子拆分
test_text = "这是第一句。这是第二句!第三句?"
sentences = processor.split_sentences(test_text)
assert len(sentences) == 3, f"期望3个句子实际{len(sentences)}"
assert sentences[0] == ("这是第一句", ""), f"第一句解析错误:{sentences[0]}"
# 测试2相邻字符交换
long_sentence = "这是一个很长的句子用来测试字符交换功能"
random.seed(42) # 固定种子以便测试
result = processor.swap_random_chars(long_sentence)
assert result != long_sentence, "长句子应该被修改"
assert len(result) == len(long_sentence), "交换后长度应该不变"
# 验证只交换了相邻的两个字符
diff_count = sum(1 for i, (a, b) in enumerate(zip(long_sentence, result)) if a != b)
assert diff_count == 2, f"应该只有2个字符位置发生变化实际{diff_count}"
# 测试3短句子不变
short_sentence = "短句"
result = processor.swap_random_chars(short_sentence)
assert result == short_sentence, "短句子不应该被修改"
# 测试4边界情况
empty_result = processor.swap_random_chars("")
assert empty_result == "", "空字符串应该保持不变"
print("✓ 所有测试通过!")
# 示例使用
def replace_text(text):
# 检查是否运行测试
if len(sys.argv) > 1 and sys.argv[1] == 'test':
run_tests()
sys.exit(0)
# 命令行模式
if len(sys.argv) > 1:
main()
else:
# 示例演示
sample_text = text
print("示例演示:")
print("原文:")
print(sample_text)
print("\n" + "=" * 50 + "\n")
processor = TextProcessor(min_length=9)
processed = processor.process_text(sample_text)
print("处理后:")
print(processed)
processor.print_statistics()
print("\n使用说明:")
print("命令行用法:")
print(" python script.py -f input.txt # 处理文件")
print(" python script.py -t '你的文本内容' # 直接处理文本")
print(" python script.py -f input.txt -l 20 # 设置长度阈值为20")
print(" python script.py -f input.txt -o output.txt # 输出到文件")
print(" python script.py -f input.txt -p '。!?' -s # 自定义标点符号并显示统计")
print(" python script.py test # 运行单元测试")
return processed
text = """盘龙江又冒出“神秘生物”啦!这次可不是娃娃鱼,网友都说:这届市民太有才咯!
01 跑步都能碰到怪鱼昆明市民这操作简直笑死人
咱就说啊最近昆明盘龙江里的神秘生物是不是有点太多啦上个月万彩城河段才惊现粉色娃娃鱼前几天又有市民在江边跑步的时候突然瞅见水里游着一条浑身雪白的怪鱼远远看去老像国家二级保护动物娃娃鱼了这位热心肠的市民啥也没说直接就报了警还特别贴心地把鱼捞上岸装进塑料袋里就好像生怕这鱼跑了似的警察赶到的时候现场都围了一圈人在那看热闹呢有人拍照有人录视频不知道的还以为在江边搞啥生物展览会
02 蝾螈假装娃娃鱼森林公安说这是家养的
民警一看这鱼长得还真有点特别赶紧联系森林公安来瞅瞅结果这剧情反转得厉害啊这压根就不是娃娃鱼而是一条跟娃娃鱼长得很像的蝾螈更逗的是森林公安民警拎着塑料袋看了老半天还补了一句这是家养的这时候我都能想象到围观群众一脸懵的样子
网友的神评论都刷爆屏了
蝾螈我就出来溜达溜达咋就进局子了呢
我建议把盘龙江改名叫神奇动物江算了下次会不会冒出尼斯湖水怪啊
这届市民也太负责了连家养的宠物都要报警上交
03 前面有粉色娃娃鱼后面有白色蝾螈盘龙江成网红打卡点
其实这已经是盘龙江今年第二次上热搜啦4月份的时候有阿姨在江里发现一条1.5米长12公斤重的粉色娃娃鱼当时还把专业救援队都给叫来了这次虽然是个乌龙事儿但网友都开始瞎想连续剧情节了下次是不是该轮到金色锦鲤啦
最逗的是评论区有人把自家鱼缸的照片都晒出来了警察叔叔我家这条金龙鱼要不要也交上去啊手动狗头
04 警察叔叔重点提醒这些动物可不能随便抓
虽说这次是虚惊一场但民警还是一本正经地提醒大家野生蝾螈和娃娃鱼可都是国家二级保护动物自己私自去抓或者养那可是可能要吃法律官司的特别是现在有些短视频平台上还有人把保护动物当宠物卖起一些什么小恐龙六角鱼之类的花里胡哨的名字来忽悠人大家可千万别上当
05 吃瓜群众应对指南
要是碰到不认识的动物该咋办呢记住这个口诀就行
1 别伸手去碰万一这动物有毒或者带着病菌呢
2 别给它投喂吃的乱喂东西可能会把它们害死
3 赶紧报警专业的事儿就交给专业的人来办
最后来个灵魂提问**你觉得盘龙江下次会出现啥神奇生物**欢迎在评论区尽情开脑洞
本文信息来源昆明警方发布都市条形码等官方通报
谢谢大家看这篇文章哈欢迎在评论区留下你的神吐槽"""
result = replace_text(text)
print(result)

View File

@ -1,13 +1,10 @@
from get_web_content import toutiao_w_extract_content
from get_web_content import toutiao_extract_content
# 使用示例
if __name__ == "__main__":
url = "https://www.toutiao.com/article/7527481094266962473/"
title, content, images = toutiao_w_extract_content(url)
print(f"标题: {title}")
print(f"内容长度: {len(content)}")
print(f"图片数量: {len(images)}")
print("图片URLs:")
for i, img_url in enumerate(images, 1):
print(f"{i}. {img_url}")
title,article,imgs = toutiao_extract_content("https://www.toutiao.com/article/7491890368917602825/?log_from=ab01481cf63ba_1744526333347")
print("title:",title)
print("article",article)
print("imgs",imgs)

File diff suppressed because one or more lines are too long

View File

@ -1,390 +0,0 @@
import PySimpleGUI as sg
import json
import os
import random
from docx.shared import Pt, RGBColor
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT, WD_UNDERLINE
from docx.enum.text import WD_ALIGN_PARAGRAPH
from docx.oxml import OxmlElement
from docx.oxml.ns import qn
from docx.enum.style import WD_STYLE_TYPE
from docx import Document
from docx.shared import Inches
from PIL import Image
# 保存文件路径的 JSON 文件
SETTINGS_FILE = 'settings.json'
def set_picture_wrapping(paragraph):
"""
设置图片环绕方式
:param paragraph:
:return:
"""
# 设置图片环绕方式为上下环绕
pPr = paragraph._element.get_or_add_pPr()
framePr = OxmlElement('w:framePr')
framePr.set(qn('w:wrap'), 'around')
framePr.set(qn('w:vAnchor'), 'text')
framePr.set(qn('w:hAnchor'), 'text')
pPr.append(framePr)
def format_word_document(input_filename, output_filename):
# 打开文档
doc = Document(input_filename)
# 创建或更新标题样式
style = doc.styles.add_style('CustomHeading', WD_STYLE_TYPE.PARAGRAPH)
style.font.name = '黑体'
style.font.size = Pt(22) # 二号字
style.font.color.rgb = RGBColor(0, 0, 255) # 蓝色
style.paragraph_format.space_after = Pt(12) # 标题后间距
# 创建或更新正文样式
style = doc.styles.add_style('CustomBody', WD_STYLE_TYPE.PARAGRAPH)
style.font.name = '仿宋'
style.font.size = Pt(14) # 四号字
style.paragraph_format.first_line_indent = Pt(20) # 首行缩进两字符
style.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.LEFT
style.paragraph_format.line_spacing = 1.5 # 行间距
style.paragraph_format.space_before = Pt(6) # 段前间距
style.paragraph_format.space_after = Pt(6) # 段后间距
# 遍历所有段落
for paragraph in doc.paragraphs:
# 设置标题格式
if paragraph.style.name.startswith('Heading'):
paragraph.style = doc.styles['CustomHeading']
# 设置段落格式
else:
paragraph.style = doc.styles['CustomBody']
# 遍历所有图片
for rel in doc.part.rels.values():
if "image" in rel.target_ref:
# 获取图片所在的段落
for paragraph in doc.paragraphs:
for run in paragraph.runs:
if run._element.tag.endswith('}pict'):
# 设置图片居中
paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER
# 设置图片环绕方式为上下环绕
set_picture_wrapping(paragraph)
paragraph.paragraph_format.space_before = Pt(12)
paragraph.paragraph_format.space_after = Pt(12)
# output_filename = remove_book_titles(output_filename)
# 保存文档
doc.save(output_filename)
def crop_and_replace_images(folder_path):
"""
修改图片尺寸
:param folder_path:
:return:
"""
folder_path = folder_path.strip()
# 遍历文件夹中的所有文件
if not os.path.exists(folder_path):
os.mkdir(folder_path)
else:
for filename in os.listdir(folder_path):
if os.path.exists(filename):
# 检查文件扩展名是否为图片格式
if filename.lower().endswith(('.jpg','.png')):
# 拼接完整的文件路径
file_path = os.path.join(folder_path, filename)
print("文件夹路径:" + folder_path)
print("文件路径:" + file_path)
# 打开图片
with Image.open(file_path) as img:
# 获取图片的尺寸
width, height = img.size
# 裁剪图片裁剪下方10px
cropped_img = img.crop((0, 0, width, height - (height * 0.2)))
# 保存裁剪后的图片,覆盖原文件
output_path = file_path[0:file_path.find('.')] + '.png'
cropped_img.save(output_path, 'PNG')
def split_text_into_paragraphs(text):
"""
将文本分割成段落并在每个段落之间加一个空行
:param text: 输入的文本
:return: 段落列表
"""
paragraphs = text.split('\n\n')
# 过滤掉空行和只包含空白字符的段落
paragraphs = list(filter(lambda p: p.strip(), paragraphs))
# 在每个段落之间加一个空行
paragraphs_with_blank_lines = []
for paragraph in paragraphs:
paragraphs_with_blank_lines.append(paragraph)
paragraphs_with_blank_lines.append('')
# 移除最后一个多余的空行
if paragraphs_with_blank_lines:
paragraphs_with_blank_lines.pop()
return paragraphs_with_blank_lines
def insert_images_into_paragraphs(paragraphs, image_folder, doc, title):
"""
将图片插入到段落中
:param paragraphs:
:param image_folder:
:param doc:
:return:
"""
if os.path.exists(image_folder):
images = sorted([os.path.join(image_folder, img) for img in os.listdir(image_folder) if
img.lower().endswith(('jpg'))])
else:
images = []
# 获取图片列表并排序
# images = sorted([os.path.join(image_folder, img) for img in os.listdir(image_folder) if
# img.lower().endswith(('jpg'))])
# images = sorted([os.path.join(image_folder, img) for img in os.listdir(image_folder) if
# # img.lower().endswith(('png', 'jpg', 'jpeg'))])
total_images = len(images)
image_index = 0
for i, paragraph in enumerate(paragraphs):
if "正文:" in paragraph:
paragraph = paragraph.replace("正文:", '')
p = doc.add_paragraph(paragraph)
if os.path.exists(image_folder):
# 插入图片
if image_index < total_images:
img_path = images[image_index]
# 确保图片路径正确且图片文件存在
if os.path.exists(img_path):
try:
with Image.open(img_path) as img:
width, height = img.size
doc.add_picture(img_path, width=Inches(width / height * 1.5))
image_index += 1
except Exception as e:
print(f"无法识别图像: {img_path}, 错误: {e}")
continue
else:
print(f"图片路径无效: {img_path}")
def create_word_document(text, image_folder, output_path, title):
"""
创建Word文档
:param text:
:param image_folder:
:param output_path:
:return:
"""
try:
doc = Document()
paragraphs = split_text_into_paragraphs(text)
insert_images_into_paragraphs(paragraphs, image_folder, doc, title)
# modify_document(doc)
doc.save(output_path)
try:
format_word_document(output_path, output_path)
except Exception as e:
print(f"格式化文档 {output_path} 时出错: {e}")
print(f'文档已保存到: {output_path}')
except Exception as e:
print(f"创建文档 {output_path} 时出错: {e}")
# 读取指定路径下txt文本的内容
def read_text_file(file_path):
"""
读取指定路径下txt文本的内容
:param file_path:
:return:
"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
except Exception as e:
print(f"读取文件 {file_path} 时出错: {e}")
return ""
def get_file_name(file_path):
"""
获取文件名
:param file_path:
:return:
"""
return os.path.basename(file_path)
def apply_random_style(paragraph):
# 预定义字体颜色列表
predefined_font_colors = [
RGBColor(255, 0, 0), # 红色
RGBColor(255, 165, 0), # 橙色
RGBColor(128, 0, 128), # 紫色
]
# 预定义背景颜色列表手动定义RGB颜色避免太亮或太深
predefined_bg_colors = [
RGBColor(240, 240, 240), # 浅灰色
RGBColor(255, 255, 224), # 浅黄色
RGBColor(224, 255, 224), # 浅绿色
RGBColor(224, 255, 255), # 浅青色
RGBColor(255, 228, 225), # 浅粉色
RGBColor(240, 248, 255), # 浅蓝色
]
# 获取段落中的每一个run对象代表一段连续的文字
for run in paragraph.runs:
# 随机选择样式
style_choice = random.choice(['bold', 'italic', 'underline', 'color', 'background'])
if style_choice == 'bold':
run.bold = True
elif style_choice == 'italic':
run.italic = True
elif style_choice == 'underline':
run.underline = WD_UNDERLINE.SINGLE
elif style_choice == 'color':
# 从预定义颜色中随机选择一个颜色
run.font.color.rgb = random.choice(predefined_font_colors)
elif style_choice == 'background':
# 从预定义背景颜色中随机选择一个颜色
run.font.color.highlight_color = random.choice(predefined_bg_colors)
def txt2docx(txt_path, image_path, keep_txt=True):
file_path = txt_path
try:
txts = sorted([os.path.join(file_path, txt) for txt in os.listdir(file_path) if
txt.lower().endswith(('txt'))])
except Exception as e:
print(f"读取文件夹 {file_path} 时出错: {e}")
sg.popup_error(f"读取文件夹 {file_path} 时出错: {e}")
return
img_path = image_path
for txt in txts:
try:
print("正在修改:" + txt)
text = read_text_file(txt)
if not text: # 如果读取失败,跳过此文件
print(f"跳过文件: {txt} (读取失败)")
continue
# print(text)
txt_name = get_file_name(txt)
title_name = txt_name.replace(".txt", "")
title = title_name
print(title)
if "正文:" in text:
new_text = text.split('正文:')[1].replace("```markdown", "").replace("```", "")
else:
new_text = text.replace("```markdown", "").replace("```", "")
content = new_text
from pathlib import Path
img_path = Path(img_path)
image_folder = img_path / txt_name.replace(".txt", "").rstrip(".")
# crop_and_replace_images(image_folder)
create_word_document(content, image_folder, txt.replace(".txt", ".docx"), title_name)
# 根据用户选择决定是否删除原始txt文件
if not keep_txt:
try:
os.remove(txt)
print(f"已删除原始文件: {txt}")
except Exception as e:
print(f"删除文件 {txt} 时出错: {e}")
else:
print(f"保留原始文件: {txt}")
except Exception as e:
print(f"处理文件 {txt} 时出错: {e}")
continue # 继续处理下一个文件
# 加载设置
def load_settings():
if os.path.exists(SETTINGS_FILE):
with open(SETTINGS_FILE, 'r') as f:
return json.load(f)
return {'folder1': '', 'folder2': ''}
# 保存设置
def save_settings(settings):
with open(SETTINGS_FILE, 'w') as f:
json.dump(settings, f)
# 自定义函数,用于处理用户选择的文件夹
def process_folders(folder1, folder2, keep_txt=True):
# 检查文件夹是否存在
if not os.path.exists(folder1):
sg.popup_error(f"文章文件夹不存在: {folder1}")
return
if not os.path.exists(folder2):
sg.popup_error(f"图片文件夹不存在: {folder2}")
return
# 在这里添加处理文件夹的代码
try:
txt2docx(folder1, folder2, keep_txt)
sg.popup("处理完成!")
except Exception as e:
sg.popup_error(f"处理过程中出错: {e}")
# 加载之前的设置
settings = load_settings()
if 'keep_txt' not in settings:
settings['keep_txt'] = True
# 定义窗口的布局
layout = [
[sg.Text('文章文件夹:'), sg.Input(default_text=settings['folder1']), sg.FolderBrowse()],
[sg.Text('图片文件夹:'), sg.Input(default_text=settings['folder2']), sg.FolderBrowse()],
[sg.Checkbox('保留原始txt文件', default=settings['keep_txt'], key='keep_txt')],
[sg.Button('确认'), sg.Button('取消')]
]
# 创建窗口
window = sg.Window('文件夹选择窗口', layout)
# 事件循环
while True:
event, values = window.read()
if event == sg.WIN_CLOSED or event == '取消': # 如果用户关闭窗口或点击取消按钮
break
elif event == '确认': # 如果用户点击确认按钮
folder1 = values[0]
folder2 = values[1]
keep_txt = values['keep_txt']
process_folders(folder1, folder2, keep_txt)
# 保存用户选择的文件夹路径和保留txt文件的选项
settings['folder1'] = folder1
settings['folder2'] = folder2
settings['keep_txt'] = keep_txt
save_settings(settings)
# 关闭窗口
window.close()

View File

@ -89,62 +89,13 @@ def extract_content_until_punctuation(text, punctuations=r'[,。!?;]'):
# 读取Excel表格链接列和类型列的内容并将内容以元组列表的形式返回
# 读取Excel表格某一列的内容并将内容以列表的形式返回
def read_excel(file_name):
datas = pd.read_excel(file_name)
first_column_name = datas.columns[0] # 链接列
type_column_name = '类型' # 类型列
first_column_name = datas.columns[0]
first_colunm_data = datas[first_column_name].tolist()
print(first_colunm_data)
links = datas[first_column_name].tolist()
# 如果存在类型列就读取,不存在则为默认类型
types = datas[type_column_name].tolist() if type_column_name in datas.columns else ['默认'] * len(links)
# 将链接和类型组合成元组列表
result = list(zip(links, types))
print(result)
return result
return first_colunm_data
from typing import Tuple
def handle_duplicate_files_advanced(folder_path: str, filename: str) -> Tuple[str, bool]:
"""
增强版处理文件夹中的同名文件支持更复杂的场景
参数:
folder_path: 文件夹路径
filename: 原始文件名
返回:
Tuple[str, bool]: (处理后的文件名, 是否是重命名的)
"""
base, ext = os.path.splitext(filename)
target_path = os.path.join(folder_path, filename)
if not os.path.exists(target_path):
return filename, False
existing_files = set(os.listdir(folder_path))
pattern = re.compile(r'^{}(_(\d+))?{}$'.format(re.escape(base), re.escape(ext)))
# 找出所有匹配的文件并提取数字
numbers = []
for f in existing_files:
match = pattern.match(f)
if match:
num = int(match.group(2)) if match.group(2) else 0
numbers.append(num)
next_num = max(numbers) + 1 if numbers else 1
new_filename = f"{base}_{next_num}{ext}"
# 确保新文件名也不存在(处理并发情况)
while new_filename in existing_files:
next_num += 1
new_filename = f"{base}_{next_num}{ext}"
return new_filename, True

View File

@ -1,8 +0,0 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml