Compare commits
	
		
			10 Commits
		
	
	
		
			bd0c6a6ff0
			...
			3b305f1d72
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 3b305f1d72 | |||
| 42fc2e661f | |||
| 0792027bea | |||
| 036eb90ca0 | |||
| 5397e7cfc2 | |||
|   | 666494c0c2 | ||
|   | 113c97c887 | ||
|   | 2d377de6fd | ||
|   | 65618b2c0b | ||
|   | b9c1a797cb | 
							
								
								
									
										1563
									
								
								ArticleReplaceBatch/ArticleReplaceDifyBatchWTT.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										1563
									
								
								ArticleReplaceBatch/ArticleReplaceDifyBatchWTT.py
									
									
									
									
									
										Normal file
									
								
							
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							| @ -13,6 +13,7 @@ def call_dify_workflow(input_data): | ||||
|     :param input_data: 传递给工作流的输入数据 | ||||
|     :return: 工作流的输出结果 | ||||
|     """ | ||||
|     logger.info("Dify开始工作。。。") | ||||
|     api_key = CONFIG['Dify']['api_key'] | ||||
|     user_id = CONFIG['Dify']['user_id'] | ||||
|     url = CONFIG['Dify']['url'] | ||||
| @ -35,3 +36,138 @@ def call_dify_workflow(input_data): | ||||
|     # print("article:", article) | ||||
|     return article | ||||
| 
 | ||||
| 
 | ||||
| # ==========================调用coze工作流========================== | ||||
| 
 | ||||
| 
 | ||||
| def call_coze_workflow(parameters): | ||||
|     """ | ||||
|     调用 Coze 工作流的函数 | ||||
| 
 | ||||
|     :param parameters: 传递给工作流的输入参数(字典格式) | ||||
|     :return: 工作流的执行结果 | ||||
|     """ | ||||
|     logger.info("Coze开始工作。。。。") | ||||
|     workflow_id = CONFIG['Coze']['workflow_id'] | ||||
|     access_token = CONFIG['Coze']['access_token'] | ||||
|     is_async = CONFIG['Coze']['is_async'].lower() == 'true' | ||||
| 
 | ||||
|     url = "https://api.coze.cn/v1/workflow/run" | ||||
| 
 | ||||
|     headers = { | ||||
|         "Authorization": f"Bearer {access_token}", | ||||
|         "Content-Type": "application/json" | ||||
|     } | ||||
| 
 | ||||
|     data = { | ||||
|         "workflow_id": workflow_id, | ||||
|         "parameters": parameters, | ||||
|         "is_async": is_async | ||||
|     } | ||||
| 
 | ||||
|     response = requests.post(url, json=data, headers=headers) | ||||
| 
 | ||||
|     if response.status_code == 200: | ||||
|         # data = json.loads(response.text)['data'] | ||||
|         # print("data:",data['output']) | ||||
| 
 | ||||
|         return response.text | ||||
|     else: | ||||
|         return { | ||||
|             "error": f"请求失败,状态码:{response.status_code}", | ||||
|             "detail": response.text | ||||
|         } | ||||
| 
 | ||||
| 
 | ||||
| def call_coze_article_workflow(parameters): | ||||
|     """ | ||||
|     调用 Coze 工作流的函数 | ||||
| 
 | ||||
|     :param parameters: 传递给工作流的输入参数(字典格式) | ||||
|     :param is_async: 是否异步执行(默认 False) | ||||
|     :return: 工作流的执行结果 | ||||
|     """ | ||||
| 
 | ||||
|     workflow_id = CONFIG['Coze']['workflow_id'] | ||||
|     access_token = CONFIG['Coze']['access_token'] | ||||
|     is_async = CONFIG['Coze']['is_async'].lower() == 'true' | ||||
|     url = "https://api.coze.cn/v1/workflow/run" | ||||
|     headers = { | ||||
|         "Authorization": f"Bearer {access_token}", | ||||
|         "Content-Type": "application/json" | ||||
|     } | ||||
|     data = { | ||||
|         "workflow_id": workflow_id, | ||||
|         "parameters": parameters, | ||||
|         "is_async": is_async | ||||
|     } | ||||
| 
 | ||||
|     response = requests.post(url, json=data, headers=headers) | ||||
| 
 | ||||
|     if response.status_code == 200: | ||||
|         # data = json.loads(response.text)['data'] | ||||
|         # print("data:",data['output']) | ||||
|         import ast | ||||
| 
 | ||||
|         # 直接解析整个result字符串 | ||||
|         result_dict = ast.literal_eval(response.text) | ||||
| 
 | ||||
|         # 解析data字段 | ||||
|         data_dict = ast.literal_eval(result_dict['data']) | ||||
| 
 | ||||
|         # 获取output的值 | ||||
|         output_value = data_dict['output'] | ||||
| 
 | ||||
|         return output_value | ||||
|     else: | ||||
|         return { | ||||
|             "error": f"请求失败,状态码:{response.status_code}", | ||||
|             "detail": response.text | ||||
|         } | ||||
| 
 | ||||
| 
 | ||||
| def call_coze_all_article_workflow(parameters,is_async=False): | ||||
|     """ | ||||
|     调用 Coze 工作流的函数 | ||||
| 
 | ||||
|     :param parameters: 传递给工作流的输入参数(字典格式) | ||||
|     :param is_async: 是否异步执行(默认 False) | ||||
|     :return: 工作流的执行结果 | ||||
|     """ | ||||
|     workflow_id = CONFIG['Coze']['workflow_id'] | ||||
|     access_token = CONFIG['Coze']['access_token'] | ||||
|     is_async = CONFIG['Coze']['is_async'].lower() == 'False' | ||||
|     url = "https://api.coze.cn/v1/workflow/run" | ||||
|     headers = { | ||||
|         "Authorization": f"Bearer {access_token}", | ||||
|         "Content-Type": "application/json" | ||||
|     } | ||||
|     data = { | ||||
|         "workflow_id": workflow_id, | ||||
|         "parameters": parameters, | ||||
|         "is_async": is_async | ||||
|     } | ||||
| 
 | ||||
|     response = requests.post(url, json=data, headers=headers) | ||||
| 
 | ||||
|     if response.status_code == 200: | ||||
|         # data = json.loads(response.text)['data'] | ||||
|         # print("data:",data['output']) | ||||
|         import ast | ||||
| 
 | ||||
|         # 直接解析整个result字符串 | ||||
|         result_dict = ast.literal_eval(response.text) | ||||
|         print(result_dict) | ||||
| 
 | ||||
|         # 解析data字段 | ||||
|         data_dict = ast.literal_eval(result_dict['data']) | ||||
| 
 | ||||
|         # 获取output的值 | ||||
|         title = data_dict['title'] | ||||
|         article = data_dict['article'] | ||||
|         return title, article | ||||
|     else: | ||||
|         return { | ||||
|             "error": f"请求失败,状态码:{response.status_code}", | ||||
|             "detail": response.text | ||||
|         } | ||||
|  | ||||
| @ -0,0 +1,12 @@ | ||||
| 
 | ||||
| 
 | ||||
| 你绝对想不到!江苏王女士最近收到电费单时惊了,夏天每月电费突然涨到800元。她翻出家里所有电器,连路由器都拔了,结果第二个月电费反而涨到900块! | ||||
| 
 | ||||
| 据《现代快报》报道,供电局工作人员上门检查后才发现,罪魁祸首是待机状态的空调。王女士家3台空调插头都没拔,每月能白白耗掉200多度电。这事让不少网友直呼"活久见",有人留言:"我家电视常年插着电源,难怪电费总降不下来!" | ||||
| 
 | ||||
| 其实国家电网早做过测试,普通家电待机功率在13瓦之间。按每天待机20小时算,光机顶盒一年就能吃掉30度电。更扎心的是,很多家庭至少有5台电器长期插着电,一年下来相当于白交三百块! | ||||
| 
 | ||||
| 我特意翻出家里老电表,发现拔掉所有插头后,电表真的转得慢了。现在我家冰箱外的电器用完就拔,这个月省了五十多电费。你家电表跑得快吗?赶紧试试拔插头吧! | ||||
| 
 | ||||
| 生活窍门 家庭用电 省电妙招 居家过日子  | ||||
| 你家最近电费有变化吗?评论区聊聊你的省电妙招吧! | ||||
| @ -0,0 +1,11 @@ | ||||
| 
 | ||||
| 
 | ||||
| 上海垃圾分类新规实施半个月,罚款总额突破200万!据东方网报道,光是黄浦区就开出了2.3万张罚单,平均每分钟都有居民被处罚。我家楼下王阿姨前天刚被罚了50块,就因为在垃圾站门口多站了半分钟。 | ||||
| 
 | ||||
| 可你绝对想不到,全市60%的罚款都集中在3个高档小区。这些小区明明配置了智能分类设备,还有专人指导,结果反而成了"重灾区"。隔壁张叔气得直拍大腿:"我天天在家分拣半小时,最后还因为垃圾袋颜色不对被罚!" | ||||
| 
 | ||||
| 据环保局数据显示,新规实施后厨余垃圾分拣正确率反而下降了5%。这事真不能全怪老百姓,有些小区督导员自己都搞不清分类标准。我亲眼见过督导员把干电池扔进有害垃圾箱,那可是要扣分的啊! | ||||
| 
 | ||||
| 不过话说回来,垃圾分类确实是利国利民的好事。关键是不能"一刀切",得给大伙儿适应时间。听说杭州试点"三次提醒再罚款"的模式,效果反而更好。这事您怎么看?您家小区垃圾分类顺利吗? | ||||
| 
 | ||||
| 垃圾分类新规 罚款争议 上海热点 社区管理 民生政策 | ||||
| @ -15,6 +15,14 @@ DEFAULT_CONFIG = { | ||||
|         "title_file": "文章链接.xlsx", | ||||
|         "max_threads": "3" | ||||
|     }, | ||||
|     "Coze": { | ||||
|         "workflow_id": "", | ||||
|         "access_token": "", | ||||
|         "is_async": "false", | ||||
|         "input_data_template": "{\"article\": \"{article_text}\", \"link\":\"{link}\", \"weijin\":\"{weijin}\"}", | ||||
|         "last_used_template": "", | ||||
|         "last_used_template_type": "文章" | ||||
|     }, | ||||
|     "Database": { | ||||
|         "host": "27.106.125.150", | ||||
|         "user": "root", | ||||
|  | ||||
| @ -10,6 +10,8 @@ from config import * | ||||
| from utils import safe_open_directory | ||||
| 
 | ||||
| IMGS_BASE_PATH = CONFIG['General']['images_path'] | ||||
| 
 | ||||
| 
 | ||||
| def crop_and_replace_images(folder_path): | ||||
|     """ | ||||
|     修改图片尺寸 | ||||
| @ -81,15 +83,25 @@ def download_image(image_url, save_path): | ||||
|         print(f"请求出错:{e}") | ||||
| 
 | ||||
| 
 | ||||
| def download_and_process_images(img_urls, article_title): | ||||
| def download_and_process_images(img_urls, article_title, save_dir=None): | ||||
|     """ | ||||
|     下载并处理图片 | ||||
|     :param img_urls: 图片URL列表 | ||||
|     :param article_title: 文章标题 | ||||
|     :param save_dir: 自定义保存目录,如果为None则使用默认目录 | ||||
|     """ | ||||
|     img_dir_path = os.path.join(IMGS_BASE_PATH, article_title) | ||||
|     if save_dir is None: | ||||
|         save_dir = IMGS_BASE_PATH | ||||
|      | ||||
|     img_dir_path = os.path.join(str(save_dir), str(article_title)) | ||||
|     logger.info(f"图片保存路径:{img_dir_path}") | ||||
|     safe_open_directory(img_dir_path) | ||||
| 
 | ||||
|     for i, img_url in enumerate(img_urls): | ||||
|         imgurl = "https:" + img_url | ||||
|         if img_url.startswith("https"): | ||||
|             imgurl = img_url | ||||
|         else: | ||||
|             imgurl = "https:"+img_url | ||||
|         img_path = os.path.join(img_dir_path, f"图片{i}.jpg") | ||||
|         try: | ||||
|             download_image(imgurl, img_path) | ||||
|  | ||||
| @ -1,209 +0,0 @@ | ||||
| import pandas as pd | ||||
| import getpass | ||||
| import sys  # 导入sys模块 | ||||
| import threading | ||||
| import queue | ||||
| 
 | ||||
| 
 | ||||
| from ai_studio import call_dify_workflow | ||||
| from databases import * | ||||
| 
 | ||||
| 
 | ||||
| from images_edit import download_and_process_images | ||||
| from utils import * | ||||
| from get_web_content import  * | ||||
| from config import * | ||||
| 
 | ||||
| # ==============================主程序=========================== | ||||
| def process_link(link): | ||||
|     """ | ||||
|     处理单个链接 | ||||
|     """ | ||||
|     try: | ||||
|         title_text, article_text, img_urls = "","",[] | ||||
|         if str(link).startswith("https://www.toutiao.com/w"): | ||||
|             title_text, article_text, img_urls = toutiao_w_extract_content(link) | ||||
|         elif str(link).startswith("https://www.toutiao.com/article/"): | ||||
|             title_text, article_text, img_urls = toutiao_extract_content(link) | ||||
|         else: | ||||
|             title_text, article_text, img_urls = "", "", [] | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
|         # 获取数据库配置 | ||||
|         host = CONFIG['Database']['host'] | ||||
|         user = CONFIG['Database']['user'] | ||||
|         password = CONFIG['Database']['password'] | ||||
|         database = CONFIG['Database']['database'] | ||||
| 
 | ||||
|         # 判断文章内容是否有违禁词 | ||||
|         check_keywords = check_keywords_in_text(title_text) | ||||
| 
 | ||||
|         if check_keywords: | ||||
|             print("文章中有违禁词!") | ||||
|             check_link_insert(host, user, password, database, link) | ||||
|             return | ||||
| 
 | ||||
|         title = extract_content_until_punctuation(article_text).replace("正文:", "") | ||||
| 
 | ||||
|         print(title) | ||||
|         print(article_text) | ||||
| 
 | ||||
|         from datetime import datetime | ||||
| 
 | ||||
|         # 获取当前时间并格式化 | ||||
|         current_time = datetime.now().strftime("%H:%M:%S") | ||||
| 
 | ||||
|         # 打印当前时间 | ||||
|         print("当前时间:", current_time) | ||||
| 
 | ||||
|         input_data = { | ||||
|             "old_article": article_text | ||||
|         } | ||||
| 
 | ||||
|         message_content = call_dify_workflow(input_data) | ||||
|         # 获取当前时间并格式化 | ||||
|         current_time = datetime.now().strftime("%H:%M:%S") | ||||
| 
 | ||||
|         # 打印当前时间 | ||||
|         print("当前时间:", current_time) | ||||
| 
 | ||||
|         finally_article = message_content.replace("正文:", "") + "\n" | ||||
| 
 | ||||
|         article_save_path = os.path.join(ARTICLES_BASE_PATH, f"{title}.txt") | ||||
| 
 | ||||
|         if '*' in finally_article or '#' in finally_article or "-" in finally_article: | ||||
|             # 使用正则表达式一次性替换多个字符 | ||||
|             old_content = re.sub(r'[*#-]', '', message_content) | ||||
|         else: | ||||
|             # 如果不需要替换,直接使用原内容 | ||||
|             old_content = finally_article | ||||
| 
 | ||||
|         print("改写完成的文章:" + old_content) | ||||
| 
 | ||||
|         # 删除AI词汇 | ||||
|         content = old_content | ||||
| 
 | ||||
|         check_link_insert(host, user, password, database, link) | ||||
| 
 | ||||
|         # 判断文章合规度 | ||||
|         if text_detection(content) == "合规": | ||||
|             print("文章合规") | ||||
|             pass | ||||
|         else: | ||||
|             print("文章不合规") | ||||
|             return | ||||
| 
 | ||||
|         with open(article_save_path, 'w', encoding='utf-8') as f: | ||||
|             f.write(content) | ||||
|         logging.info('文本已经保存') | ||||
| 
 | ||||
|         if img_urls: | ||||
|             download_and_process_images(img_urls, title) | ||||
| 
 | ||||
|     except Exception as e: | ||||
|         logging.error(f"处理链接 {link} 时出错: {e}") | ||||
|         raise | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| def link_to_text(prompt1=None, prompt2=None, num_threads=None): | ||||
|     use_link_path = 'use_link_path.txt' | ||||
| 
 | ||||
|     # 读取链接 | ||||
|     links = read_excel(TITLE_BASE_PATH) | ||||
| 
 | ||||
|     # 过滤已处理的链接 | ||||
|     filtered_links = [] | ||||
|     host = CONFIG['Database']['host'] | ||||
|     user = CONFIG['Database']['user'] | ||||
|     password = CONFIG['Database']['password'] | ||||
|     database = CONFIG['Database']['database'] | ||||
| 
 | ||||
|     for link in links: | ||||
|         logging.info(f"总共{len(links)}个链接") | ||||
|         if check_link_exists(host, user, password, database, link): | ||||
|             logger.info(f"链接已存在: {link}") | ||||
|             continue | ||||
|         else: | ||||
|             filtered_links.append(link) | ||||
|             logger.info(f"链接不存在: {link}") | ||||
|             print("链接不存在,存储到过滤器中:",link) | ||||
| 
 | ||||
|     if not filtered_links: | ||||
|         logger.info("没有新链接需要处理") | ||||
|         return [] | ||||
| 
 | ||||
|     # 使用多线程处理链接 | ||||
|     results = process_links_with_threads(filtered_links, num_threads) | ||||
| 
 | ||||
|     # 记录已处理的链接 | ||||
|     with open(use_link_path, 'a+', encoding='utf-8') as f: | ||||
|         for link, success, _ in results: | ||||
|             if success: | ||||
|                 f.write(link + "\n") | ||||
| 
 | ||||
|     return results | ||||
| 
 | ||||
| 
 | ||||
| # 创建一个任务队列和结果队列 | ||||
| task_queue = queue.Queue() | ||||
| result_queue = queue.Queue() | ||||
| 
 | ||||
| 
 | ||||
| # 工作线程函数 | ||||
| def worker(): | ||||
|     while True: | ||||
|         try: | ||||
|             # 从队列中获取任务 | ||||
|             link = task_queue.get() | ||||
|             if link is None:  # 结束信号 | ||||
|                 break | ||||
| 
 | ||||
|             # 处理链接 | ||||
|             try: | ||||
|                 process_link(link) | ||||
|                 result_queue.put((link, True, None))  # 成功 | ||||
|             except Exception as e: | ||||
|                 result_queue.put((link, False, str(e)))  # 失败 | ||||
|                 logger.error(f"处理链接 {link} 时出错: {e}") | ||||
| 
 | ||||
|             # 标记任务完成 | ||||
|             task_queue.task_done() | ||||
|         except Exception as e: | ||||
|             logger.error(f"工作线程出错: {e}") | ||||
| 
 | ||||
| 
 | ||||
| # 多线程处理链接 | ||||
| def process_links_with_threads(links, num_threads=None): | ||||
|     if num_threads is None: | ||||
|         num_threads = min(MAX_THREADS, len(links)) | ||||
|     else: | ||||
|         num_threads = min(num_threads, MAX_THREADS, len(links)) | ||||
| 
 | ||||
|     # 创建工作线程 | ||||
|     threads = [] | ||||
|     for _ in range(num_threads): | ||||
|         t = threading.Thread(target=worker) | ||||
|         t.daemon = True | ||||
|         t.start() | ||||
|         threads.append(t) | ||||
| 
 | ||||
|     # 添加任务到队列 | ||||
|     for link in links: | ||||
|         task_queue.put(link) | ||||
| 
 | ||||
|     # 添加结束信号 | ||||
|     for _ in range(num_threads): | ||||
|         task_queue.put(None) | ||||
| 
 | ||||
|     # 等待所有线程完成 | ||||
|     for t in threads: | ||||
|         t.join() | ||||
| 
 | ||||
|     # 处理结果 | ||||
|     results = [] | ||||
|     while not result_queue.empty(): | ||||
|         results.append(result_queue.get()) | ||||
| 
 | ||||
|     return results | ||||
							
								
								
									
										299
									
								
								ArticleReplaceBatch/main_process_wtt.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										299
									
								
								ArticleReplaceBatch/main_process_wtt.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,299 @@ | ||||
| import threading | ||||
| import queue | ||||
| import json  # 导入 json 模块 | ||||
| 
 | ||||
| from ai_studio import call_dify_workflow,call_coze_article_workflow,call_coze_all_article_workflow | ||||
| from databases import * | ||||
| 
 | ||||
| from images_edit import download_and_process_images | ||||
| from utils import * | ||||
| from get_web_content import * | ||||
| from config import * | ||||
| 
 | ||||
| 
 | ||||
| # ==============================主程序=========================== | ||||
| def process_link(link_info, ai_service, current_template=None,generation_type=None): | ||||
|     link, article_type = link_info  # 解包链接和类型信息 | ||||
|     """ | ||||
|     处理单个链接 | ||||
|     :param link: 要处理的链接 | ||||
|     :param ai_service: AI服务提供商,可选值:dify, coze | ||||
|     :param current_template: 当前选择的模板配置 | ||||
|     """ | ||||
|     try: | ||||
|         if link.startswith("https://www.toutiao.com"): | ||||
|             title_text, article_text, img_urls = toutiao_w_extract_content(link) | ||||
|             if title_text == "": | ||||
|                 title_text, article_text, img_urls = toutiao_extract_content(link) | ||||
|         elif link.startswith("https://mp.weixin.qq.co"): | ||||
|             title_text, article_text, img_urls = wechat_extract_content(link) | ||||
|         elif link.startswith("https://www.163.com"): | ||||
|             title_text, article_text, img_urls = wangyi_extract_content(link) | ||||
|         else: | ||||
|             title_text, article_text, img_urls = "", "", [] | ||||
| 
 | ||||
|         if title_text == "": | ||||
|             return | ||||
|         elif len(title_text) > 100: | ||||
|             return | ||||
| 
 | ||||
|             # 获取数据库配置 | ||||
|         host = CONFIG['Database']['host'] | ||||
|         user = CONFIG['Database']['user'] | ||||
|         password = CONFIG['Database']['password'] | ||||
|         database = CONFIG['Database']['database'] | ||||
| 
 | ||||
|         # 判断文章内容是否有违禁词 | ||||
|         check_keywords = check_keywords_in_text(title_text) | ||||
| 
 | ||||
|         title = extract_content_until_punctuation(article_text).replace("正文:", "") | ||||
| 
 | ||||
| 
 | ||||
|         from datetime import datetime | ||||
| 
 | ||||
|         # 获取当前时间并格式化 | ||||
|         current_time = datetime.now().strftime("%H:%M:%S") | ||||
| 
 | ||||
|         # 打印当前时间 | ||||
|         print("当前时间:", current_time) | ||||
| 
 | ||||
|         if ai_service == "dify": | ||||
|             if check_keywords: | ||||
|                 print("文章中有违禁词!") | ||||
|                 check_link_insert(host, user, password, database, link) | ||||
|                 return | ||||
|             # 从配置加载 input_data 模板 | ||||
|             input_data_template_str = CONFIG['Dify'].get('input_data_template', '{{"old_article": "{article_text}"}}') | ||||
|             try: | ||||
|                 # 解析模板字符串为字典 | ||||
|                 input_data_template = json.loads(input_data_template_str) | ||||
|                 # 使用实际变量格式化模板 | ||||
|                 input_data = {k: v.format(article_text=article_text) for k, v in input_data_template.items()} | ||||
|             except (json.JSONDecodeError, KeyError, AttributeError) as e: | ||||
|                 logger.error(f"处理 Dify input_data 模板时出错: {e}. 使用默认模板.") | ||||
|                 input_data = { | ||||
|                     "old_article": article_text | ||||
|                 } | ||||
| 
 | ||||
|             # input_data = { | ||||
|             #     "old_article": article_text | ||||
|             # } | ||||
|             message_content = call_dify_workflow(input_data) | ||||
|         elif ai_service == "coze": | ||||
|             logger.info("coze正在处理") | ||||
|             logger.info(f"正在处理的文章类型为:{generation_type}") | ||||
|             # 如果有模板配置,临时更新CONFIG | ||||
|             original_config = None | ||||
|             if current_template: | ||||
|                 original_config = { | ||||
|                     'workflow_id': CONFIG['Coze']['workflow_id'], | ||||
|                     'access_token': CONFIG['Coze']['access_token'], | ||||
|                     'is_async': CONFIG['Coze']['is_async'], | ||||
|                     'input_data_template': CONFIG['Coze'].get('input_data_template', '') | ||||
|                 } | ||||
|                  | ||||
|                 CONFIG['Coze']['workflow_id'] = current_template.get('workflow_id', '') | ||||
|                 CONFIG['Coze']['access_token'] = current_template.get('access_token', '') | ||||
|                 CONFIG['Coze']['is_async'] = current_template.get('is_async', 'true') | ||||
|                 CONFIG['Coze']['input_data_template'] = current_template.get('input_data_template', '') | ||||
|                  | ||||
|                 logger.info(f"应用模板配置: {current_template.get('name')}") | ||||
|                 logger.info(f"Workflow ID: {CONFIG['Coze']['workflow_id']}") | ||||
|                 logger.info(f"Access Token: {'*' * len(CONFIG['Coze']['access_token'])}") | ||||
|                 logger.info(f"Is Async: {CONFIG['Coze']['is_async']}") | ||||
|                 logger.info(f"Input Template: {CONFIG['Coze']['input_data_template']}") | ||||
|              | ||||
|             try: | ||||
|                 # 从配置加载 Coze input_data 模板 | ||||
|                 input_data_template_str = CONFIG['Coze'].get('input_data_template') | ||||
|                 # 解析模板字符串为字典 | ||||
|                 input_data_template = json.loads(input_data_template_str) | ||||
|                 # 使用实际变量格式化模板 | ||||
|                 title = "" | ||||
|                 if generation_type == "短篇": | ||||
|                     input_data = { | ||||
|                         "article": article_text | ||||
|                     } | ||||
|                     print("coze中输入:",input_data) | ||||
|                     message_content = call_coze_article_workflow(input_data) | ||||
| 
 | ||||
|                 elif generation_type == "文章": | ||||
|                     print("原文中标题为:",title_text) | ||||
|                     print("原文中内容为:",article_text) | ||||
|                     input_data = { | ||||
|                         "title":title_text, | ||||
|                         "article": article_text | ||||
|                     } | ||||
|                     print("发送的请求数据为:",input_data) | ||||
|                     title, message_content = call_coze_all_article_workflow(input_data) | ||||
| 
 | ||||
| 
 | ||||
|             finally: | ||||
|                 # 恢复原始配置(如果有的话) | ||||
|                 if original_config is not None: | ||||
|                     CONFIG['Coze']['workflow_id'] = original_config['workflow_id'] | ||||
|                     CONFIG['Coze']['access_token'] = original_config['access_token'] | ||||
|                     CONFIG['Coze']['is_async'] = original_config['is_async'] | ||||
|                     CONFIG['Coze']['input_data_template'] = original_config['input_data_template'] | ||||
| 
 | ||||
|         # 获取当前时间并格式化 | ||||
|         current_time = datetime.now().strftime("%H:%M:%S") | ||||
| 
 | ||||
|         print("原文章", article_text) | ||||
|         print("========================") | ||||
|         print("改写后的文章",message_content) | ||||
| 
 | ||||
|         # 打印当前时间 | ||||
|         print("当前时间:", current_time) | ||||
|         file_name = "" | ||||
|         if generation_type == '短篇': | ||||
|             file_name = handle_duplicate_files_advanced(ARTICLES_BASE_PATH,title_text)[0] | ||||
|         elif generation_type == "文章": | ||||
|             file_name = handle_duplicate_files_advanced(ARTICLES_BASE_PATH,title)[0] | ||||
| 
 | ||||
| 
 | ||||
|         # 创建类型目录 | ||||
|         type_dir = os.path.join(ARTICLES_BASE_PATH, article_type) | ||||
|         safe_open_directory(type_dir) | ||||
|          | ||||
|         # 在类型目录下保存文章 | ||||
|         article_save_path = os.path.join(type_dir, f"{file_name}.txt") | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
|         # 判断文章合规度 | ||||
|         if text_detection(message_content) == "合规": | ||||
|             print("文章合规") | ||||
|             pass | ||||
|         else: | ||||
|             print("文章不合规") | ||||
|             return | ||||
| 
 | ||||
|         with open(article_save_path, 'w', encoding='utf-8') as f: | ||||
|             f.write(message_content) | ||||
|         logging.info('文本已经保存') | ||||
| 
 | ||||
|         if img_urls: | ||||
|             # 在类型目录下创建图片目录 | ||||
|             type_picture_dir = os.path.join(IMGS_BASE_PATH, article_type) | ||||
|             safe_open_directory(type_picture_dir) | ||||
|             download_and_process_images(img_urls, file_name, type_picture_dir) | ||||
| 
 | ||||
|     except Exception as e: | ||||
|         logging.error(f"处理链接 {link} 时出错: {e}") | ||||
|         raise | ||||
| 
 | ||||
| 
 | ||||
| def link_to_text(num_threads=None, ai_service="dify", current_template=None, generation_type=None): | ||||
|     use_link_path = 'use_link_path.txt' | ||||
| 
 | ||||
|     # 读取链接 | ||||
|     links = read_excel(TITLE_BASE_PATH) | ||||
| 
 | ||||
|     # 过滤已处理的链接 | ||||
|     filtered_links = [] | ||||
|     host = CONFIG['Database']['host'] | ||||
|     user = CONFIG['Database']['user'] | ||||
|     password = CONFIG['Database']['password'] | ||||
|     database = CONFIG['Database']['database'] | ||||
| 
 | ||||
|     for link_info in links: | ||||
|         link = link_info[0].strip()  # 获取链接并去除空白字符 | ||||
|         # 如果Excel中有类型,使用Excel中的类型,否则使用传入的generation_type | ||||
|         article_type = link_info[1].strip() if len(link_info) > 1 and link_info[1].strip() else generation_type | ||||
|         logging.info(f"总共{len(links)}个链接") | ||||
|         # if check_link_exists(host, user, password, database, link): | ||||
|         # logger.info(f"链接已存在: {link}") | ||||
|             # continue | ||||
|         # else: | ||||
|         filtered_links.append((link, article_type))  # 保存链接和类型的元组 | ||||
|             # logger.info(f"链接不存在: {link}") | ||||
|             # print("链接不存在,存储到过滤器中:", link) | ||||
| 
 | ||||
|     if not filtered_links: | ||||
|         logger.info("没有新链接需要处理") | ||||
|         return [] | ||||
| 
 | ||||
|     # 使用多线程处理链接 | ||||
|     results = process_links_with_threads(filtered_links, num_threads, ai_service, current_template,generation_type) | ||||
| 
 | ||||
|     # 记录已处理的链接 | ||||
|     with open(use_link_path, 'a+', encoding='utf-8') as f: | ||||
|         for link, success, _ in results: | ||||
|             if success: | ||||
|                 f.write(link + "\n") | ||||
| 
 | ||||
|     return results | ||||
| 
 | ||||
| 
 | ||||
| # 创建一个任务队列和结果队列 | ||||
| task_queue = queue.Queue() | ||||
| result_queue = queue.Queue() | ||||
| 
 | ||||
| 
 | ||||
| # 工作线程函数 | ||||
| def worker(ai_service, current_template=None,generation_type=None): | ||||
|     while True: | ||||
|         try: | ||||
|             # 从队列中获取任务 | ||||
|             link = task_queue.get() | ||||
|             if link is None:  # 结束信号 | ||||
|                 break | ||||
| 
 | ||||
|             # 处理链接 | ||||
|             try: | ||||
|                 logger.info(f"开始处理链接:{link}") | ||||
|                 process_link(link, ai_service, current_template,generation_type) | ||||
|                 result_queue.put((link, True, None))  # 成功 | ||||
|             except Exception as e: | ||||
|                 result_queue.put((link, False, str(e)))  # 失败 | ||||
|                 logger.error(f"处理链接 {link} 时出错: {e}") | ||||
| 
 | ||||
|             # 标记任务完成 | ||||
|             task_queue.task_done() | ||||
|         except Exception as e: | ||||
|             logger.error(f"工作线程出错: {e}") | ||||
| 
 | ||||
| 
 | ||||
| # 多线程处理链接 | ||||
| def process_links_with_threads(links, num_threads=None, ai_service="dify", current_template=None,generation_type=None): | ||||
|     if num_threads is None: | ||||
|         num_threads = min(MAX_THREADS, len(links)) | ||||
|     else: | ||||
|         num_threads = min(num_threads, MAX_THREADS, len(links)) | ||||
| 
 | ||||
|     # 清空任务队列和结果队列 | ||||
|     while not task_queue.empty(): | ||||
|         task_queue.get() | ||||
|     while not result_queue.empty(): | ||||
|         result_queue.get() | ||||
| 
 | ||||
|     # 创建工作线程 | ||||
|     threads = [] | ||||
| 
 | ||||
|     # 将AI服务选择和模板配置传递给worker函数 | ||||
|     for _ in range(num_threads): | ||||
|         t = threading.Thread(target=worker, args=(ai_service, current_template,generation_type)) | ||||
|         t.daemon = True | ||||
|         t.start() | ||||
|         threads.append(t) | ||||
| 
 | ||||
|     # 添加任务到队列 | ||||
|     for link in links: | ||||
|         task_queue.put(link) | ||||
| 
 | ||||
|     # 添加结束信号 | ||||
|     for _ in range(num_threads): | ||||
|         task_queue.put(None) | ||||
| 
 | ||||
|     # 等待所有线程完成 | ||||
|     for t in threads: | ||||
|         t.join() | ||||
| 
 | ||||
|     # 处理结果 | ||||
|     results = [] | ||||
|     while not result_queue.empty(): | ||||
|         results.append(result_queue.get()) | ||||
| 
 | ||||
|     return results | ||||
							
								
								
									
										25
									
								
								ArticleReplaceBatch/model/config.json
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										25
									
								
								ArticleReplaceBatch/model/config.json
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,25 @@ | ||||
| { | ||||
|   "architectures": [ | ||||
|     "BertForMaskedLM" | ||||
|   ], | ||||
|   "attention_probs_dropout_prob": 0.1, | ||||
|   "directionality": "bidi", | ||||
|   "hidden_act": "gelu", | ||||
|   "hidden_dropout_prob": 0.1, | ||||
|   "hidden_size": 768, | ||||
|   "initializer_range": 0.02, | ||||
|   "intermediate_size": 3072, | ||||
|   "layer_norm_eps": 1e-12, | ||||
|   "max_position_embeddings": 512, | ||||
|   "model_type": "bert", | ||||
|   "num_attention_heads": 12, | ||||
|   "num_hidden_layers": 12, | ||||
|   "pad_token_id": 0, | ||||
|   "pooler_fc_size": 768, | ||||
|   "pooler_num_attention_heads": 12, | ||||
|   "pooler_num_fc_layers": 3, | ||||
|   "pooler_size_per_head": 128, | ||||
|   "pooler_type": "first_token_transform", | ||||
|   "type_vocab_size": 2, | ||||
|   "vocab_size": 21128 | ||||
| } | ||||
										
											Binary file not shown.
										
									
								
							| After Width: | Height: | Size: 120 KiB | 
										
											Binary file not shown.
										
									
								
							| After Width: | Height: | Size: 28 KiB | 
							
								
								
									
										464
									
								
								ArticleReplaceBatch/replacestr.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										464
									
								
								ArticleReplaceBatch/replacestr.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,464 @@ | ||||
| import re | ||||
| import random | ||||
| import argparse | ||||
| import sys | ||||
| import os | ||||
| from typing import List, Tuple, Optional, Dict, Any | ||||
| from pathlib import Path | ||||
| import logging | ||||
| 
 | ||||
| 
 | ||||
| class TextProcessor: | ||||
|     """文本处理器类,支持句子拆分和字符交换""" | ||||
| 
 | ||||
|     def __init__(self, min_length: int = 30, custom_punctuation: Optional[str] = None): | ||||
|         """ | ||||
|         初始化文本处理器 | ||||
| 
 | ||||
|         Args: | ||||
|             min_length: 句子长度阈值 | ||||
|             custom_punctuation: 自定义标点符号,如果为None则使用默认标点 | ||||
|         """ | ||||
|         self.min_length = min_length | ||||
|         self.sentence_endings = custom_punctuation or r'[,!?;?!;]' | ||||
|         self.statistics = { | ||||
|             'total_sentences': 0, | ||||
|             'processed_sentences': 0, | ||||
|             'total_chars': 0, | ||||
|             'swapped_chars': 0 | ||||
|         } | ||||
| 
 | ||||
|         # 设置日志 | ||||
|         logging.basicConfig(level=logging.INFO, | ||||
|                             format='%(asctime)s - %(levelname)s - %(message)s') | ||||
|         self.logger = logging.getLogger(__name__) | ||||
| 
 | ||||
|     def split_sentences(self, text: str) -> List[Tuple[str, str]]: | ||||
|         """ | ||||
|         按标点符号拆分句子,保留标点符号 | ||||
| 
 | ||||
|         Args: | ||||
|             text: 输入文本 | ||||
| 
 | ||||
|         Returns: | ||||
|             List[Tuple[str, str]]: 每个元组包含 (句子内容, 标点符号) | ||||
|         """ | ||||
|         if not text.strip(): | ||||
|             return [] | ||||
| 
 | ||||
|         # 使用正则表达式拆分,保留分隔符 | ||||
|         parts = re.split(f'({self.sentence_endings})', text) | ||||
| 
 | ||||
|         sentences = [] | ||||
|         i = 0 | ||||
|         while i < len(parts): | ||||
|             content = parts[i].strip() | ||||
|             if content:  # 非空内容 | ||||
|                 # 检查下一个部分是否是标点符号 | ||||
|                 if i + 1 < len(parts) and re.match(self.sentence_endings, parts[i + 1]): | ||||
|                     punctuation = parts[i + 1] | ||||
|                     i += 2 | ||||
|                 else: | ||||
|                     punctuation = '' | ||||
|                     i += 1 | ||||
|                 sentences.append((content, punctuation)) | ||||
|                 self.statistics['total_sentences'] += 1 | ||||
|             else: | ||||
|                 i += 1 | ||||
| 
 | ||||
|         return sentences | ||||
| 
 | ||||
|     def swap_random_chars(self, sentence: str) -> str: | ||||
|         """ | ||||
|         对超长句子随机交换相邻两个字符的顺序 | ||||
| 
 | ||||
|         Args: | ||||
|             sentence: 输入句子 | ||||
| 
 | ||||
|         Returns: | ||||
|             str: 处理后的句子 | ||||
|         """ | ||||
|         # 边界情况处理 | ||||
|         if not sentence or len(sentence) <= self.min_length or len(sentence) <= 3: | ||||
|             return sentence | ||||
| 
 | ||||
|         # 转换为字符列表便于操作 | ||||
|         chars = list(sentence) | ||||
|         original_length = len(chars) | ||||
| 
 | ||||
|         # 确定可交换的范围(避开首尾字符,且需要成对相邻) | ||||
|         # 对于长度为n的句子,可交换的相邻对位置为:(1,2), (2,3), ..., (n-3,n-2) | ||||
|         start_idx = 1 | ||||
|         end_idx = len(chars) - 3  # 最后一个可交换对的起始位置 | ||||
| 
 | ||||
|         if end_idx < start_idx: | ||||
|             return sentence | ||||
| 
 | ||||
|         try: | ||||
|             # 随机选择一个相邻对的起始位置 | ||||
|             swap_start = random.randint(start_idx, end_idx) | ||||
|             swap_end = swap_start + 1 | ||||
| 
 | ||||
|             # 交换相邻的两个字符 | ||||
|             chars[swap_start], chars[swap_end] = chars[swap_end], chars[swap_start] | ||||
| 
 | ||||
|             # 更新统计信息 | ||||
|             self.statistics['processed_sentences'] += 1 | ||||
|             self.statistics['swapped_chars'] += 2 | ||||
| 
 | ||||
|             self.logger.debug(f"交换相邻位置 {swap_start} 和 {swap_end},句子长度:{original_length}") | ||||
| 
 | ||||
|         except (ValueError, IndexError) as e: | ||||
|             self.logger.warning(f"字符交换失败:{e}") | ||||
|             return sentence | ||||
| 
 | ||||
|         return ''.join(chars) | ||||
| 
 | ||||
|     def process_text(self, text: str) -> str: | ||||
|         """ | ||||
|         处理文本:拆分句子并对超长句子进行字符交换 | ||||
| 
 | ||||
|         Args: | ||||
|             text: 输入文本 | ||||
| 
 | ||||
|         Returns: | ||||
|             str: 处理后的文本 | ||||
|         """ | ||||
|         if not text: | ||||
|             return text | ||||
| 
 | ||||
|         # 重置统计信息 | ||||
|         self.statistics = { | ||||
|             'total_sentences': 0, | ||||
|             'processed_sentences': 0, | ||||
|             'total_chars': len(text), | ||||
|             'swapped_chars': 0 | ||||
|         } | ||||
| 
 | ||||
|         # 按段落分割 | ||||
|         paragraphs = text.split('\n') | ||||
|         processed_paragraphs = [] | ||||
| 
 | ||||
|         for paragraph in paragraphs: | ||||
|             if not paragraph.strip(): | ||||
|                 processed_paragraphs.append(paragraph) | ||||
|                 continue | ||||
| 
 | ||||
|             # 拆分句子 | ||||
|             sentences = self.split_sentences(paragraph) | ||||
| 
 | ||||
|             # 处理每个句子 | ||||
|             processed_sentences = [] | ||||
|             for sentence_content, punctuation in sentences: | ||||
|                 # 对句子内容进行字符交换 | ||||
|                 processed_content = self.swap_random_chars(sentence_content) | ||||
|                 processed_sentences.append(processed_content + punctuation) | ||||
| 
 | ||||
|             # 重新组合句子 | ||||
|             processed_paragraph = ''.join(processed_sentences) | ||||
|             processed_paragraphs.append(processed_paragraph) | ||||
| 
 | ||||
|         return '\n'.join(processed_paragraphs) | ||||
| 
 | ||||
|     def get_statistics(self) -> Dict[str, Any]: | ||||
|         """获取处理统计信息""" | ||||
|         return self.statistics.copy() | ||||
| 
 | ||||
|     def print_statistics(self): | ||||
|         """打印处理统计信息""" | ||||
|         stats = self.get_statistics() | ||||
|         print("\n" + "=" * 50) | ||||
|         print("处理统计信息:") | ||||
|         print(f"总字符数:{stats['total_chars']}") | ||||
|         print(f"总句子数:{stats['total_sentences']}") | ||||
|         print(f"处理句子数:{stats['processed_sentences']}") | ||||
|         print(f"交换字符数:{stats['swapped_chars']}") | ||||
|         if stats['total_sentences'] > 0: | ||||
|             print(f"处理率:{stats['processed_sentences'] / stats['total_sentences'] * 100:.1f}%") | ||||
|         print("=" * 50) | ||||
| 
 | ||||
| 
 | ||||
| class FileHandler: | ||||
|     """文件处理器,负责文件的读写操作""" | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def read_file(filename: str) -> str: | ||||
|         """ | ||||
|         读取文件内容,支持多种编码 | ||||
| 
 | ||||
|         Args: | ||||
|             filename: 文件路径 | ||||
| 
 | ||||
|         Returns: | ||||
|             str: 文件内容 | ||||
| 
 | ||||
|         Raises: | ||||
|             FileNotFoundError: 文件不存在 | ||||
|             PermissionError: 权限不足 | ||||
|             UnicodeDecodeError: 编码错误 | ||||
|         """ | ||||
|         if not os.path.exists(filename): | ||||
|             raise FileNotFoundError(f"文件 '{filename}' 不存在") | ||||
| 
 | ||||
|         if not os.access(filename, os.R_OK): | ||||
|             raise PermissionError(f"没有读取文件 '{filename}' 的权限") | ||||
| 
 | ||||
|         # 尝试多种编码格式 | ||||
|         encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1'] | ||||
| 
 | ||||
|         for encoding in encodings: | ||||
|             try: | ||||
|                 with open(filename, 'r', encoding=encoding) as f: | ||||
|                     content = f.read() | ||||
|                     logging.info(f"使用 {encoding} 编码成功读取文件:{filename}") | ||||
|                     return content | ||||
|             except UnicodeDecodeError: | ||||
|                 continue | ||||
| 
 | ||||
|         raise UnicodeDecodeError(f"无法解码文件 '{filename}',尝试的编码格式:{encodings}") | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def write_file(filename: str, content: str, encoding: str = 'utf-8') -> None: | ||||
|         """ | ||||
|         写入文件内容 | ||||
| 
 | ||||
|         Args: | ||||
|             filename: 输出文件路径 | ||||
|             content: 要写入的内容 | ||||
|             encoding: 编码格式 | ||||
| 
 | ||||
|         Raises: | ||||
|             PermissionError: 权限不足 | ||||
|             OSError: 磁盘空间不足等系统错误 | ||||
|         """ | ||||
|         # 确保目录存在 | ||||
|         output_dir = os.path.dirname(filename) | ||||
|         if output_dir and not os.path.exists(output_dir): | ||||
|             os.makedirs(output_dir, exist_ok=True) | ||||
| 
 | ||||
|         try: | ||||
|             with open(filename, 'w', encoding=encoding) as f: | ||||
|                 f.write(content) | ||||
|             logging.info(f"成功写入文件:{filename}") | ||||
|         except PermissionError: | ||||
|             raise PermissionError(f"没有写入文件 '{filename}' 的权限") | ||||
|         except OSError as e: | ||||
|             raise OSError(f"写入文件 '{filename}' 时发生错误:{e}") | ||||
| 
 | ||||
| 
 | ||||
| def setup_argument_parser() -> argparse.ArgumentParser: | ||||
|     """设置命令行参数解析器""" | ||||
|     parser = argparse.ArgumentParser( | ||||
|         description='文本句子字符交换处理器', | ||||
|         formatter_class=argparse.RawDescriptionHelpFormatter, | ||||
|         epilog=""" | ||||
| 使用示例: | ||||
|   %(prog)s -f input.txt                    # 处理文件 | ||||
|   %(prog)s -t "你的文本内容"               # 直接处理文本 | ||||
|   %(prog)s -f input.txt -l 20              # 设置长度阈值为20 | ||||
|   %(prog)s -f input.txt -o output.txt      # 输出到文件 | ||||
|   %(prog)s -f input.txt -p "。!?" -s     # 自定义标点符号并显示统计 | ||||
|         """ | ||||
|     ) | ||||
| 
 | ||||
|     # 输入选项 | ||||
|     input_group = parser.add_mutually_exclusive_group(required=True) | ||||
|     input_group.add_argument('-f', '--file', help='输入文件路径') | ||||
|     input_group.add_argument('-t', '--text', help='直接输入文本') | ||||
|     input_group.add_argument('--stdin', action='store_true', | ||||
|                              help='从标准输入读取文本') | ||||
| 
 | ||||
|     # 处理选项 | ||||
|     parser.add_argument('-l', '--length', type=int, default=30, | ||||
|                         help='句子长度阈值(默认30)') | ||||
|     parser.add_argument('-p', '--punctuation', | ||||
|                         help='自定义标点符号(默认:。!?;?!;)') | ||||
|     parser.add_argument('-o', '--output', help='输出文件路径') | ||||
|     parser.add_argument('-e', '--encoding', default='utf-8', | ||||
|                         help='输出文件编码(默认utf-8)') | ||||
| 
 | ||||
|     # 其他选项 | ||||
|     parser.add_argument('-s', '--statistics', action='store_true', | ||||
|                         help='显示处理统计信息') | ||||
|     parser.add_argument('-v', '--verbose', action='store_true', | ||||
|                         help='显示详细日志') | ||||
|     parser.add_argument('--seed', type=int, help='随机数种子(用于测试)') | ||||
| 
 | ||||
|     return parser | ||||
| 
 | ||||
| 
 | ||||
| def main(): | ||||
|     """主函数:处理命令行参数和文本处理""" | ||||
|     parser = setup_argument_parser() | ||||
|     args = parser.parse_args() | ||||
| 
 | ||||
|     # 设置日志级别 | ||||
|     if args.verbose: | ||||
|         logging.getLogger().setLevel(logging.DEBUG) | ||||
| 
 | ||||
|     # 设置随机数种子(用于测试) | ||||
|     if args.seed: | ||||
|         random.seed(args.seed) | ||||
| 
 | ||||
|     # 获取输入文本 | ||||
|     try: | ||||
|         if args.file: | ||||
|             text = FileHandler.read_file(args.file) | ||||
|         elif args.text: | ||||
|             text = args.text | ||||
|         elif args.stdin: | ||||
|             text = sys.stdin.read() | ||||
|         else: | ||||
|             print("错误:请指定输入源") | ||||
|             sys.exit(1) | ||||
| 
 | ||||
|         if not text.strip(): | ||||
|             print("警告:输入文本为空") | ||||
|             sys.exit(0) | ||||
| 
 | ||||
|     except (FileNotFoundError, PermissionError, UnicodeDecodeError) as e: | ||||
|         print(f"错误:{e}") | ||||
|         sys.exit(1) | ||||
| 
 | ||||
|     # 创建处理器并处理文本 | ||||
|     try: | ||||
|         processor = TextProcessor( | ||||
|             min_length=args.length, | ||||
|             custom_punctuation=args.punctuation | ||||
|         ) | ||||
| 
 | ||||
|         processed_text = processor.process_text(text) | ||||
| 
 | ||||
|         # 输出结果 | ||||
|         if args.output: | ||||
|             FileHandler.write_file(args.output, processed_text, args.encoding) | ||||
|             print(f"处理完成,结果已保存到 '{args.output}'") | ||||
|         else: | ||||
|             print("处理结果:") | ||||
|             print("-" * 50) | ||||
|             print(processed_text) | ||||
| 
 | ||||
|         # 显示统计信息 | ||||
|         if args.statistics: | ||||
|             processor.print_statistics() | ||||
| 
 | ||||
|     except Exception as e: | ||||
|         print(f"处理过程中发生错误:{e}") | ||||
|         if args.verbose: | ||||
|             import traceback | ||||
|             traceback.print_exc() | ||||
|         sys.exit(1) | ||||
| 
 | ||||
| 
 | ||||
| # 单元测试 | ||||
| def run_tests(): | ||||
|     """运行基本的单元测试""" | ||||
|     print("运行单元测试...") | ||||
| 
 | ||||
|     # 测试句子拆分 | ||||
|     processor = TextProcessor(min_length=6) | ||||
| 
 | ||||
|     # 测试1:普通句子拆分 | ||||
|     test_text = "这是第一句。这是第二句!第三句?" | ||||
|     sentences = processor.split_sentences(test_text) | ||||
|     assert len(sentences) == 3, f"期望3个句子,实际{len(sentences)}个" | ||||
|     assert sentences[0] == ("这是第一句", "。"), f"第一句解析错误:{sentences[0]}" | ||||
| 
 | ||||
|     # 测试2:相邻字符交换 | ||||
|     long_sentence = "这是一个很长的句子用来测试字符交换功能" | ||||
|     random.seed(42)  # 固定种子以便测试 | ||||
|     result = processor.swap_random_chars(long_sentence) | ||||
|     assert result != long_sentence, "长句子应该被修改" | ||||
|     assert len(result) == len(long_sentence), "交换后长度应该不变" | ||||
| 
 | ||||
|     # 验证只交换了相邻的两个字符 | ||||
|     diff_count = sum(1 for i, (a, b) in enumerate(zip(long_sentence, result)) if a != b) | ||||
|     assert diff_count == 2, f"应该只有2个字符位置发生变化,实际{diff_count}个" | ||||
| 
 | ||||
|     # 测试3:短句子不变 | ||||
|     short_sentence = "短句" | ||||
|     result = processor.swap_random_chars(short_sentence) | ||||
|     assert result == short_sentence, "短句子不应该被修改" | ||||
| 
 | ||||
|     # 测试4:边界情况 | ||||
|     empty_result = processor.swap_random_chars("") | ||||
|     assert empty_result == "", "空字符串应该保持不变" | ||||
| 
 | ||||
|     print("✓ 所有测试通过!") | ||||
| 
 | ||||
| 
 | ||||
| # 示例使用 | ||||
| def replace_text(text): | ||||
|     # 检查是否运行测试 | ||||
|     if len(sys.argv) > 1 and sys.argv[1] == 'test': | ||||
|         run_tests() | ||||
|         sys.exit(0) | ||||
| 
 | ||||
|     # 命令行模式 | ||||
|     if len(sys.argv) > 1: | ||||
|         main() | ||||
|     else: | ||||
|         # 示例演示 | ||||
|         sample_text = text | ||||
| 
 | ||||
|         print("示例演示:") | ||||
|         print("原文:") | ||||
|         print(sample_text) | ||||
|         print("\n" + "=" * 50 + "\n") | ||||
| 
 | ||||
|         processor = TextProcessor(min_length=9) | ||||
|         processed = processor.process_text(sample_text) | ||||
|         print("处理后:") | ||||
|         print(processed) | ||||
| 
 | ||||
|         processor.print_statistics() | ||||
| 
 | ||||
|         print("\n使用说明:") | ||||
|         print("命令行用法:") | ||||
|         print("  python script.py -f input.txt              # 处理文件") | ||||
|         print("  python script.py -t '你的文本内容'          # 直接处理文本") | ||||
|         print("  python script.py -f input.txt -l 20        # 设置长度阈值为20") | ||||
|         print("  python script.py -f input.txt -o output.txt # 输出到文件") | ||||
|         print("  python script.py -f input.txt -p '。!?' -s # 自定义标点符号并显示统计") | ||||
|         print("  python script.py test                       # 运行单元测试") | ||||
| 
 | ||||
|         return processed | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| text = """盘龙江又冒出“神秘生物”啦!这次可不是娃娃鱼,网友都说:这届市民太有才咯! | ||||
| 
 | ||||
| 01 跑步都能碰到“怪鱼”?昆明市民这操作简直笑死人! | ||||
| 咱就说啊,最近昆明盘龙江里的“神秘生物”是不是有点太多啦?上个月万彩城河段才惊现粉色娃娃鱼,前几天又有市民在江边跑步的时候,突然瞅见水里游着一条浑身雪白的“怪鱼”,远远看去,老像国家二级保护动物娃娃鱼了。嘿,这位热心肠的市民啥也没说,直接就报了警,还特别贴心地把鱼捞上岸,装进塑料袋里,就好像生怕这鱼跑了似的。警察赶到的时候,现场都围了一圈人在那看热闹呢,有人拍照,有人录视频,不知道的还以为在江边搞啥“生物展览会”呢! | ||||
| 
 | ||||
| 02 蝾螈假装娃娃鱼?森林公安说:这是家养的! | ||||
| 民警一看这鱼,长得还真有点特别,赶紧联系森林公安来瞅瞅。结果这剧情反转得厉害啊——这压根就不是娃娃鱼,而是一条跟娃娃鱼长得很像的蝾螈!更逗的是,森林公安民警拎着塑料袋看了老半天,还补了一句:“这是家养的。”(这时候我都能想象到围观群众一脸懵的样子) | ||||
| 
 | ||||
| 网友的神评论都刷爆屏了: | ||||
| 
 | ||||
| • “蝾螈:我就出来溜达溜达,咋就进局子了呢?” | ||||
| • “我建议把盘龙江改名叫‘神奇动物江’算了,下次会不会冒出尼斯湖水怪啊?” | ||||
| • “这届市民也太负责了,连家养的宠物都要报警上交!” | ||||
| 03 前面有粉色娃娃鱼,后面有白色蝾螈!盘龙江成“网红打卡点”了? | ||||
| 其实这已经是盘龙江今年第二次上热搜啦。4月份的时候,有阿姨在江里发现一条1.5米长、12公斤重的粉色娃娃鱼,当时还把专业救援队都给叫来了。这次虽然是个乌龙事儿,但网友都开始瞎想连续剧情节了:“下次是不是该轮到金色锦鲤啦?” | ||||
| 
 | ||||
| 最逗的是评论区有人把自家鱼缸的照片都晒出来了,说:“警察叔叔,我家这条金龙鱼要不要也交上去啊?”(手动狗头) | ||||
| 
 | ||||
| 04 警察叔叔重点提醒:这些动物可不能随便抓! | ||||
| 虽说这次是虚惊一场,但民警还是一本正经地提醒大家:野生蝾螈和娃娃鱼可都是国家二级保护动物,自己私自去抓或者养,那可是可能要吃法律官司的。特别是现在有些短视频平台上,还有人把保护动物当宠物卖,起一些什么‘小恐龙’‘六角鱼’之类的花里胡哨的名字来忽悠人,大家可千万别上当! | ||||
| 
 | ||||
| 05 吃瓜群众应对指南 | ||||
| 要是碰到不认识的动物该咋办呢?记住这个口诀就行: | ||||
| 1️⃣ 别伸手去碰(万一这动物有毒或者带着病菌呢) | ||||
| 2️⃣ 别给它投喂吃的(乱喂东西可能会把它们害死) | ||||
| 3️⃣ 赶紧报警(专业的事儿就交给专业的人来办) | ||||
| 
 | ||||
| 最后来个灵魂提问:**你觉得盘龙江下次会出现啥神奇生物?**欢迎在评论区尽情开脑洞! | ||||
| 
 | ||||
| (本文信息来源:昆明警方发布、都市条形码等官方通报) | ||||
| 
 | ||||
| 谢谢大家看这篇文章哈,欢迎在评论区留下你的神吐槽!""" | ||||
| 
 | ||||
| 
 | ||||
| result = replace_text(text) | ||||
| print(result) | ||||
| @ -1,10 +1,13 @@ | ||||
| from get_web_content import toutiao_extract_content | ||||
| from get_web_content import toutiao_w_extract_content | ||||
| 
 | ||||
| # 使用示例 | ||||
| if __name__ == "__main__": | ||||
|     url = "https://www.toutiao.com/article/7527481094266962473/" | ||||
|     title, content, images = toutiao_w_extract_content(url) | ||||
| 
 | ||||
| title,article,imgs = toutiao_extract_content("https://www.toutiao.com/article/7491890368917602825/?log_from=ab01481cf63ba_1744526333347") | ||||
| 
 | ||||
| print("title:",title) | ||||
| 
 | ||||
| print("article",article) | ||||
| 
 | ||||
| print("imgs",imgs) | ||||
|     print(f"标题: {title}") | ||||
|     print(f"内容长度: {len(content)}") | ||||
|     print(f"图片数量: {len(images)}") | ||||
|     print("图片URLs:") | ||||
|     for i, img_url in enumerate(images, 1): | ||||
|         print(f"{i}. {img_url}") | ||||
							
								
								
									
										117
									
								
								ArticleReplaceBatch/toutiao_source_enhanced.html
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										117
									
								
								ArticleReplaceBatch/toutiao_source_enhanced.html
									
									
									
									
									
										Normal file
									
								
							
										
											
												File diff suppressed because one or more lines are too long
											
										
									
								
							
							
								
								
									
										390
									
								
								ArticleReplaceBatch/txt2docx.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										390
									
								
								ArticleReplaceBatch/txt2docx.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,390 @@ | ||||
| import PySimpleGUI as sg | ||||
| import json | ||||
| 
 | ||||
| import os | ||||
| import random | ||||
| 
 | ||||
| from docx.shared import Pt, RGBColor | ||||
| from docx.enum.text import WD_PARAGRAPH_ALIGNMENT, WD_UNDERLINE | ||||
| from docx.enum.text import WD_ALIGN_PARAGRAPH | ||||
| from docx.oxml import OxmlElement | ||||
| from docx.oxml.ns import qn | ||||
| from docx.enum.style import WD_STYLE_TYPE | ||||
| from docx import Document | ||||
| from docx.shared import Inches | ||||
| from PIL import Image | ||||
| 
 | ||||
| # 保存文件路径的 JSON 文件 | ||||
| SETTINGS_FILE = 'settings.json' | ||||
| 
 | ||||
| 
 | ||||
| def set_picture_wrapping(paragraph): | ||||
|     """ | ||||
|     设置图片环绕方式 | ||||
|     :param paragraph: | ||||
|     :return: | ||||
|     """ | ||||
|     # 设置图片环绕方式为上下环绕 | ||||
|     pPr = paragraph._element.get_or_add_pPr() | ||||
|     framePr = OxmlElement('w:framePr') | ||||
|     framePr.set(qn('w:wrap'), 'around') | ||||
|     framePr.set(qn('w:vAnchor'), 'text') | ||||
|     framePr.set(qn('w:hAnchor'), 'text') | ||||
|     pPr.append(framePr) | ||||
| 
 | ||||
| 
 | ||||
| def format_word_document(input_filename, output_filename): | ||||
|     # 打开文档 | ||||
|     doc = Document(input_filename) | ||||
| 
 | ||||
|     # 创建或更新标题样式 | ||||
|     style = doc.styles.add_style('CustomHeading', WD_STYLE_TYPE.PARAGRAPH) | ||||
|     style.font.name = '黑体' | ||||
|     style.font.size = Pt(22)  # 二号字 | ||||
|     style.font.color.rgb = RGBColor(0, 0, 255)  # 蓝色 | ||||
|     style.paragraph_format.space_after = Pt(12)  # 标题后间距 | ||||
|     # 创建或更新正文样式 | ||||
|     style = doc.styles.add_style('CustomBody', WD_STYLE_TYPE.PARAGRAPH) | ||||
|     style.font.name = '仿宋' | ||||
|     style.font.size = Pt(14)  # 四号字 | ||||
|     style.paragraph_format.first_line_indent = Pt(20)  # 首行缩进两字符 | ||||
|     style.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.LEFT | ||||
|     style.paragraph_format.line_spacing = 1.5  # 行间距 | ||||
|     style.paragraph_format.space_before = Pt(6)  # 段前间距 | ||||
|     style.paragraph_format.space_after = Pt(6)  # 段后间距 | ||||
| 
 | ||||
|     # 遍历所有段落 | ||||
|     for paragraph in doc.paragraphs: | ||||
|         # 设置标题格式 | ||||
|         if paragraph.style.name.startswith('Heading'): | ||||
|             paragraph.style = doc.styles['CustomHeading'] | ||||
| 
 | ||||
|         # 设置段落格式 | ||||
|         else: | ||||
|             paragraph.style = doc.styles['CustomBody'] | ||||
| 
 | ||||
|     # 遍历所有图片 | ||||
|     for rel in doc.part.rels.values(): | ||||
|         if "image" in rel.target_ref: | ||||
|             # 获取图片所在的段落 | ||||
|             for paragraph in doc.paragraphs: | ||||
|                 for run in paragraph.runs: | ||||
|                     if run._element.tag.endswith('}pict'): | ||||
|                         # 设置图片居中 | ||||
|                         paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER | ||||
|                         # 设置图片环绕方式为上下环绕 | ||||
|                         set_picture_wrapping(paragraph) | ||||
|                         paragraph.paragraph_format.space_before = Pt(12) | ||||
|                         paragraph.paragraph_format.space_after = Pt(12) | ||||
| 
 | ||||
|     # output_filename = remove_book_titles(output_filename) | ||||
| 
 | ||||
|     # 保存文档 | ||||
|     doc.save(output_filename) | ||||
| 
 | ||||
| 
 | ||||
| def crop_and_replace_images(folder_path): | ||||
|     """ | ||||
|     修改图片尺寸 | ||||
|     :param folder_path: | ||||
|     :return: | ||||
|     """ | ||||
|     folder_path = folder_path.strip() | ||||
|     # 遍历文件夹中的所有文件 | ||||
|     if not os.path.exists(folder_path): | ||||
|         os.mkdir(folder_path) | ||||
|     else: | ||||
|         for filename in os.listdir(folder_path): | ||||
|             if os.path.exists(filename): | ||||
|                 # 检查文件扩展名是否为图片格式 | ||||
|                 if filename.lower().endswith(('.jpg','.png')): | ||||
|                     # 拼接完整的文件路径 | ||||
|                     file_path = os.path.join(folder_path, filename) | ||||
|                     print("文件夹路径:" + folder_path) | ||||
|                     print("文件路径:" + file_path) | ||||
|                     # 打开图片 | ||||
|                     with Image.open(file_path) as img: | ||||
|                         # 获取图片的尺寸 | ||||
|                         width, height = img.size | ||||
|                         # 裁剪图片,裁剪下方10px | ||||
|                         cropped_img = img.crop((0, 0, width, height - (height * 0.2))) | ||||
|                         # 保存裁剪后的图片,覆盖原文件 | ||||
|                         output_path = file_path[0:file_path.find('.')] + '.png' | ||||
|                         cropped_img.save(output_path, 'PNG') | ||||
| 
 | ||||
| 
 | ||||
| def split_text_into_paragraphs(text): | ||||
|     """ | ||||
|      将文本分割成段落,并在每个段落之间加一个空行 | ||||
|      :param text: 输入的文本 | ||||
|      :return: 段落列表 | ||||
|      """ | ||||
|     paragraphs = text.split('\n\n') | ||||
|     # 过滤掉空行和只包含空白字符的段落 | ||||
|     paragraphs = list(filter(lambda p: p.strip(), paragraphs)) | ||||
| 
 | ||||
|     # 在每个段落之间加一个空行 | ||||
|     paragraphs_with_blank_lines = [] | ||||
|     for paragraph in paragraphs: | ||||
|         paragraphs_with_blank_lines.append(paragraph) | ||||
|         paragraphs_with_blank_lines.append('') | ||||
| 
 | ||||
|     # 移除最后一个多余的空行 | ||||
|     if paragraphs_with_blank_lines: | ||||
|         paragraphs_with_blank_lines.pop() | ||||
| 
 | ||||
|     return paragraphs_with_blank_lines | ||||
| 
 | ||||
| 
 | ||||
| def insert_images_into_paragraphs(paragraphs, image_folder, doc, title): | ||||
|     """ | ||||
|     将图片插入到段落中 | ||||
|     :param paragraphs: | ||||
|     :param image_folder: | ||||
|     :param doc: | ||||
|     :return: | ||||
|     """ | ||||
| 
 | ||||
|     if os.path.exists(image_folder): | ||||
|         images = sorted([os.path.join(image_folder, img) for img in os.listdir(image_folder) if | ||||
|                          img.lower().endswith(('jpg'))]) | ||||
|     else: | ||||
|         images = [] | ||||
| 
 | ||||
|     # 获取图片列表并排序 | ||||
|     # images = sorted([os.path.join(image_folder, img) for img in os.listdir(image_folder) if | ||||
|     #                  img.lower().endswith(('jpg'))]) | ||||
|     # images = sorted([os.path.join(image_folder, img) for img in os.listdir(image_folder) if | ||||
|     #                 #  img.lower().endswith(('png', 'jpg', 'jpeg'))]) | ||||
| 
 | ||||
|     total_images = len(images) | ||||
| 
 | ||||
|     image_index = 0 | ||||
|     for i, paragraph in enumerate(paragraphs): | ||||
| 
 | ||||
|         if "正文:" in paragraph: | ||||
|             paragraph = paragraph.replace("正文:", '') | ||||
|         p = doc.add_paragraph(paragraph) | ||||
|         if os.path.exists(image_folder): | ||||
|             # 插入图片 | ||||
|             if image_index < total_images: | ||||
|                 img_path = images[image_index] | ||||
| 
 | ||||
|                 # 确保图片路径正确且图片文件存在 | ||||
|                 if os.path.exists(img_path): | ||||
|                     try: | ||||
|                         with Image.open(img_path) as img: | ||||
|                             width, height = img.size | ||||
|                             doc.add_picture(img_path, width=Inches(width / height * 1.5)) | ||||
|                             image_index += 1 | ||||
|                     except Exception as e: | ||||
|                         print(f"无法识别图像: {img_path}, 错误: {e}") | ||||
|                         continue | ||||
|                 else: | ||||
|                     print(f"图片路径无效: {img_path}") | ||||
| 
 | ||||
| 
 | ||||
| def create_word_document(text, image_folder, output_path, title): | ||||
|     """ | ||||
|     创建Word文档 | ||||
|     :param text: | ||||
|     :param image_folder: | ||||
|     :param output_path: | ||||
|     :return: | ||||
|     """ | ||||
|     try: | ||||
|         doc = Document() | ||||
|         paragraphs = split_text_into_paragraphs(text) | ||||
|         insert_images_into_paragraphs(paragraphs, image_folder, doc, title) | ||||
|         # modify_document(doc) | ||||
|         doc.save(output_path) | ||||
|         try: | ||||
|             format_word_document(output_path, output_path) | ||||
|         except Exception as e: | ||||
|             print(f"格式化文档 {output_path} 时出错: {e}") | ||||
|         print(f'文档已保存到: {output_path}') | ||||
|     except Exception as e: | ||||
|         print(f"创建文档 {output_path} 时出错: {e}") | ||||
| 
 | ||||
| 
 | ||||
| # 读取指定路径下txt文本的内容 | ||||
| def read_text_file(file_path): | ||||
|     """ | ||||
|     读取指定路径下txt文本的内容 | ||||
|     :param file_path: | ||||
|     :return: | ||||
|     """ | ||||
|     try: | ||||
|         with open(file_path, 'r', encoding='utf-8') as file: | ||||
|             return file.read() | ||||
|     except Exception as e: | ||||
|         print(f"读取文件 {file_path} 时出错: {e}") | ||||
|         return "" | ||||
| 
 | ||||
| 
 | ||||
| def get_file_name(file_path): | ||||
|     """ | ||||
|     获取文件名 | ||||
|     :param file_path: | ||||
|     :return: | ||||
|     """ | ||||
|     return os.path.basename(file_path) | ||||
| 
 | ||||
| 
 | ||||
| def apply_random_style(paragraph): | ||||
|     # 预定义字体颜色列表 | ||||
|     predefined_font_colors = [ | ||||
|         RGBColor(255, 0, 0),  # 红色 | ||||
|         RGBColor(255, 165, 0),  # 橙色 | ||||
|         RGBColor(128, 0, 128),  # 紫色 | ||||
|     ] | ||||
| 
 | ||||
|     # 预定义背景颜色列表(手动定义RGB颜色,避免太亮或太深) | ||||
|     predefined_bg_colors = [ | ||||
|         RGBColor(240, 240, 240),  # 浅灰色 | ||||
|         RGBColor(255, 255, 224),  # 浅黄色 | ||||
|         RGBColor(224, 255, 224),  # 浅绿色 | ||||
|         RGBColor(224, 255, 255),  # 浅青色 | ||||
|         RGBColor(255, 228, 225),  # 浅粉色 | ||||
|         RGBColor(240, 248, 255),  # 浅蓝色 | ||||
|     ] | ||||
| 
 | ||||
|     # 获取段落中的每一个run对象(代表一段连续的文字) | ||||
|     for run in paragraph.runs: | ||||
|         # 随机选择样式 | ||||
|         style_choice = random.choice(['bold', 'italic', 'underline', 'color', 'background']) | ||||
| 
 | ||||
|         if style_choice == 'bold': | ||||
|             run.bold = True | ||||
|         elif style_choice == 'italic': | ||||
|             run.italic = True | ||||
|         elif style_choice == 'underline': | ||||
|             run.underline = WD_UNDERLINE.SINGLE | ||||
|         elif style_choice == 'color': | ||||
|             # 从预定义颜色中随机选择一个颜色 | ||||
|             run.font.color.rgb = random.choice(predefined_font_colors) | ||||
|         elif style_choice == 'background': | ||||
|             # 从预定义背景颜色中随机选择一个颜色 | ||||
|             run.font.color.highlight_color = random.choice(predefined_bg_colors) | ||||
| 
 | ||||
| 
 | ||||
| def txt2docx(txt_path, image_path, keep_txt=True): | ||||
|     file_path = txt_path | ||||
|     try: | ||||
|         txts = sorted([os.path.join(file_path, txt) for txt in os.listdir(file_path) if | ||||
|                    txt.lower().endswith(('txt'))]) | ||||
|     except Exception as e: | ||||
|         print(f"读取文件夹 {file_path} 时出错: {e}") | ||||
|         sg.popup_error(f"读取文件夹 {file_path} 时出错: {e}") | ||||
|         return | ||||
|          | ||||
|     img_path = image_path | ||||
| 
 | ||||
|     for txt in txts: | ||||
|         try: | ||||
|             print("正在修改:" + txt) | ||||
|             text = read_text_file(txt) | ||||
|             if not text:  # 如果读取失败,跳过此文件 | ||||
|                 print(f"跳过文件: {txt} (读取失败)") | ||||
|                 continue | ||||
|                  | ||||
|             # print(text) | ||||
|             txt_name = get_file_name(txt) | ||||
|             title_name = txt_name.replace(".txt", "") | ||||
|             title = title_name | ||||
|             print(title) | ||||
|             if "正文:" in text: | ||||
|                 new_text = text.split('正文:')[1].replace("```markdown", "").replace("```", "") | ||||
|             else: | ||||
|                 new_text = text.replace("```markdown", "").replace("```", "") | ||||
|             content = new_text | ||||
|              | ||||
|             from pathlib import Path | ||||
| 
 | ||||
|             img_path = Path(img_path) | ||||
|             image_folder = img_path / txt_name.replace(".txt", "").rstrip(".") | ||||
| 
 | ||||
|             # crop_and_replace_images(image_folder) | ||||
| 
 | ||||
|             create_word_document(content, image_folder, txt.replace(".txt", ".docx"), title_name) | ||||
| 
 | ||||
|             # 根据用户选择决定是否删除原始txt文件 | ||||
|             if not keep_txt: | ||||
|                 try: | ||||
|                     os.remove(txt) | ||||
|                     print(f"已删除原始文件: {txt}") | ||||
|                 except Exception as e: | ||||
|                     print(f"删除文件 {txt} 时出错: {e}") | ||||
|             else: | ||||
|                 print(f"保留原始文件: {txt}") | ||||
|         except Exception as e: | ||||
|             print(f"处理文件 {txt} 时出错: {e}") | ||||
|             continue  # 继续处理下一个文件 | ||||
| 
 | ||||
| 
 | ||||
| # 加载设置 | ||||
| def load_settings(): | ||||
|     if os.path.exists(SETTINGS_FILE): | ||||
|         with open(SETTINGS_FILE, 'r') as f: | ||||
|             return json.load(f) | ||||
|     return {'folder1': '', 'folder2': ''} | ||||
| 
 | ||||
| 
 | ||||
| # 保存设置 | ||||
| def save_settings(settings): | ||||
|     with open(SETTINGS_FILE, 'w') as f: | ||||
|         json.dump(settings, f) | ||||
| 
 | ||||
| 
 | ||||
| # 自定义函数,用于处理用户选择的文件夹 | ||||
| def process_folders(folder1, folder2, keep_txt=True): | ||||
|     # 检查文件夹是否存在 | ||||
|     if not os.path.exists(folder1): | ||||
|         sg.popup_error(f"文章文件夹不存在: {folder1}") | ||||
|         return | ||||
|     if not os.path.exists(folder2): | ||||
|         sg.popup_error(f"图片文件夹不存在: {folder2}") | ||||
|         return | ||||
|          | ||||
|     # 在这里添加处理文件夹的代码 | ||||
|     try: | ||||
|         txt2docx(folder1, folder2, keep_txt) | ||||
|         sg.popup("处理完成!") | ||||
|     except Exception as e: | ||||
|         sg.popup_error(f"处理过程中出错: {e}") | ||||
| 
 | ||||
| 
 | ||||
| # 加载之前的设置 | ||||
| settings = load_settings() | ||||
| if 'keep_txt' not in settings: | ||||
|     settings['keep_txt'] = True | ||||
| 
 | ||||
| # 定义窗口的布局 | ||||
| layout = [ | ||||
|     [sg.Text('文章文件夹:'), sg.Input(default_text=settings['folder1']), sg.FolderBrowse()], | ||||
|     [sg.Text('图片文件夹:'), sg.Input(default_text=settings['folder2']), sg.FolderBrowse()], | ||||
|     [sg.Checkbox('保留原始txt文件', default=settings['keep_txt'], key='keep_txt')], | ||||
|     [sg.Button('确认'), sg.Button('取消')] | ||||
| ] | ||||
| 
 | ||||
| # 创建窗口 | ||||
| window = sg.Window('文件夹选择窗口', layout) | ||||
| 
 | ||||
| # 事件循环 | ||||
| while True: | ||||
|     event, values = window.read() | ||||
|     if event == sg.WIN_CLOSED or event == '取消':  # 如果用户关闭窗口或点击取消按钮 | ||||
|         break | ||||
|     elif event == '确认':  # 如果用户点击确认按钮 | ||||
|         folder1 = values[0] | ||||
|         folder2 = values[1] | ||||
|         keep_txt = values['keep_txt'] | ||||
|         process_folders(folder1, folder2, keep_txt) | ||||
|         # 保存用户选择的文件夹路径和保留txt文件的选项 | ||||
|         settings['folder1'] = folder1 | ||||
|         settings['folder2'] = folder2 | ||||
|         settings['keep_txt'] = keep_txt | ||||
|         save_settings(settings) | ||||
| 
 | ||||
| # 关闭窗口 | ||||
| window.close() | ||||
| @ -89,13 +89,62 @@ def extract_content_until_punctuation(text, punctuations=r'[,。!?;]'): | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| # 读取Excel表格某一列的内容并将内容以列表的形式返回 | ||||
| # 读取Excel表格链接列和类型列的内容并将内容以元组列表的形式返回 | ||||
| def read_excel(file_name): | ||||
|     datas = pd.read_excel(file_name) | ||||
|     first_column_name = datas.columns[0] | ||||
|     first_colunm_data = datas[first_column_name].tolist() | ||||
|     print(first_colunm_data) | ||||
| 
 | ||||
|     return first_colunm_data | ||||
|     first_column_name = datas.columns[0]  # 链接列 | ||||
|     type_column_name = '类型'  # 类型列 | ||||
|      | ||||
|     links = datas[first_column_name].tolist() | ||||
|     # 如果存在类型列就读取,不存在则为默认类型 | ||||
|     types = datas[type_column_name].tolist() if type_column_name in datas.columns else ['默认'] * len(links) | ||||
|      | ||||
|     # 将链接和类型组合成元组列表 | ||||
|     result = list(zip(links, types)) | ||||
|     print(result) | ||||
|      | ||||
|     return result | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| from typing import Tuple | ||||
| 
 | ||||
| 
 | ||||
| def handle_duplicate_files_advanced(folder_path: str, filename: str) -> Tuple[str, bool]: | ||||
|     """ | ||||
|     增强版:处理文件夹中的同名文件,支持更复杂的场景 | ||||
| 
 | ||||
|     参数: | ||||
|         folder_path: 文件夹路径 | ||||
|         filename: 原始文件名 | ||||
| 
 | ||||
|     返回: | ||||
|         Tuple[str, bool]: (处理后的文件名, 是否是重命名的) | ||||
|     """ | ||||
|     base, ext = os.path.splitext(filename) | ||||
|     target_path = os.path.join(folder_path, filename) | ||||
| 
 | ||||
|     if not os.path.exists(target_path): | ||||
|         return filename, False | ||||
| 
 | ||||
|     existing_files = set(os.listdir(folder_path)) | ||||
|     pattern = re.compile(r'^{}(_(\d+))?{}$'.format(re.escape(base), re.escape(ext))) | ||||
| 
 | ||||
|     # 找出所有匹配的文件并提取数字 | ||||
|     numbers = [] | ||||
|     for f in existing_files: | ||||
|         match = pattern.match(f) | ||||
|         if match: | ||||
|             num = int(match.group(2)) if match.group(2) else 0 | ||||
|             numbers.append(num) | ||||
| 
 | ||||
|     next_num = max(numbers) + 1 if numbers else 1 | ||||
|     new_filename = f"{base}_{next_num}{ext}" | ||||
| 
 | ||||
|     # 确保新文件名也不存在(处理并发情况) | ||||
|     while new_filename in existing_files: | ||||
|         next_num += 1 | ||||
|         new_filename = f"{base}_{next_num}{ext}" | ||||
| 
 | ||||
|     return new_filename, True | ||||
							
								
								
									
										8
									
								
								text translation/.idea/.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										8
									
								
								text translation/.idea/.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @ -0,0 +1,8 @@ | ||||
| # Default ignored files | ||||
| /shelf/ | ||||
| /workspace.xml | ||||
| # Editor-based HTTP Client requests | ||||
| /httpRequests/ | ||||
| # Datasource local storage ignored files | ||||
| /dataSources/ | ||||
| /dataSources.local.xml | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user