2025-10-25 16:45:02 +08:00
|
|
|
|
import json
|
|
|
|
|
|
|
|
|
|
|
|
import re
|
|
|
|
|
|
|
|
|
|
|
|
import pandas as pd
|
|
|
|
|
|
import requests
|
|
|
|
|
|
from config import *
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def text_detection(text):
|
|
|
|
|
|
"""
|
|
|
|
|
|
百度检验文字是否违规
|
|
|
|
|
|
:param text:
|
|
|
|
|
|
:return:
|
|
|
|
|
|
"""
|
|
|
|
|
|
url = "https://aip.baidubce.com/rest/2.0/solution/v1/text_censor/v2/user_defined?access_token=" + get_baidu_access_token()
|
|
|
|
|
|
payload = 'text=' + text
|
|
|
|
|
|
headers = {
|
|
|
|
|
|
'Content-Type': 'application/x-www-form-urlencoded',
|
|
|
|
|
|
'Accept': 'application/json'
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
response = requests.request("POST", url, headers=headers, data=payload)
|
|
|
|
|
|
content = str(response.text)
|
|
|
|
|
|
data = json.loads(content)
|
|
|
|
|
|
print(data)
|
2025-11-29 14:37:27 +08:00
|
|
|
|
|
|
|
|
|
|
# 安全地获取 conclusion 字段,如果不存在则返回默认值
|
|
|
|
|
|
conclusion = data.get('conclusion', '合规')
|
2025-10-25 16:45:02 +08:00
|
|
|
|
return conclusion
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_baidu_access_token():
|
|
|
|
|
|
"""
|
|
|
|
|
|
使用 AK,SK 生成鉴权签名(Access Token),百度信息获取
|
|
|
|
|
|
:return: access_token,或是None(如果错误)
|
|
|
|
|
|
"""
|
|
|
|
|
|
API_KEY = CONFIG['Baidu']['api_key']
|
|
|
|
|
|
SECRET_KEY = CONFIG['Baidu']['secret_key']
|
|
|
|
|
|
|
|
|
|
|
|
url = "https://aip.baidubce.com/oauth/2.0/token"
|
|
|
|
|
|
params = {"grant_type": "client_credentials", "client_id": API_KEY, "client_secret": SECRET_KEY}
|
|
|
|
|
|
return str(requests.post(url, params=params).json().get("access_token"))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def safe_filename(filename):
|
|
|
|
|
|
"""
|
|
|
|
|
|
处理文件名,移除或替换不安全的字符
|
|
|
|
|
|
"""
|
|
|
|
|
|
# 替换Windows文件系统中不允许的字符
|
|
|
|
|
|
invalid_chars = '<>:"/\\|?*'
|
|
|
|
|
|
for char in invalid_chars:
|
|
|
|
|
|
filename = filename.replace(char, '_')
|
|
|
|
|
|
# 去除首尾空格和点
|
|
|
|
|
|
filename = filename.strip('. ')
|
|
|
|
|
|
# 如果文件名为空,使用默认名称
|
|
|
|
|
|
if not filename:
|
|
|
|
|
|
filename = 'untitled'
|
|
|
|
|
|
return filename
|
|
|
|
|
|
|
|
|
|
|
|
def safe_open_directory(directory_path):
|
|
|
|
|
|
"""
|
|
|
|
|
|
安全创建目录,确保路径格式正确并创建所有必要的父目录
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 规范化路径
|
|
|
|
|
|
directory_path = os.path.normpath(directory_path)
|
|
|
|
|
|
if not os.path.exists(directory_path):
|
|
|
|
|
|
os.makedirs(directory_path, exist_ok=True)
|
|
|
|
|
|
os.chmod(directory_path, 0o777)
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
# 打印日志并保存到日志文件中
|
|
|
|
|
|
logging.error(f"创建目录失败: {e}")
|
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def check_keywords_in_text(text):
|
|
|
|
|
|
"""
|
|
|
|
|
|
检查文本中是否包含违禁词
|
|
|
|
|
|
:param text:
|
|
|
|
|
|
:return:
|
|
|
|
|
|
"""
|
|
|
|
|
|
keywords = CONFIG['Keywords']['banned_words'].split(',')
|
|
|
|
|
|
for keyword in keywords:
|
|
|
|
|
|
if keyword.strip() in text:
|
|
|
|
|
|
return True
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_content_until_punctuation(text, punctuations=r'[,。!?;]'):
|
|
|
|
|
|
"""
|
|
|
|
|
|
截取一段话中从开始到最近的标点符号的内容。
|
|
|
|
|
|
|
|
|
|
|
|
:param text: 输入的文本
|
|
|
|
|
|
:param punctuations: 标点符号的正则表达式模式,默认为",","。","!","?",";"
|
|
|
|
|
|
:return: 截取的内容
|
|
|
|
|
|
"""
|
|
|
|
|
|
# 使用正则表达式查找标点符号的位置
|
|
|
|
|
|
match = re.search(punctuations, text)
|
|
|
|
|
|
|
|
|
|
|
|
if match:
|
|
|
|
|
|
# 如果找到标点符号,截取从开始到标点符号之前的部分
|
|
|
|
|
|
return text[:match.end()].strip()
|
|
|
|
|
|
else:
|
|
|
|
|
|
# 如果没有找到标点符号,返回整个文本
|
|
|
|
|
|
return text.strip()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 读取Excel表格链接列和类型列的内容并将内容以元组列表的形式返回
|
|
|
|
|
|
def read_excel(file_name):
|
|
|
|
|
|
datas = pd.read_excel(file_name)
|
|
|
|
|
|
first_column_name = datas.columns[0] # 链接列
|
|
|
|
|
|
type_column_name = '领域' # 类型列
|
|
|
|
|
|
|
|
|
|
|
|
links = datas[first_column_name].tolist()
|
|
|
|
|
|
# 如果存在类型列就读取,不存在则为默认类型
|
|
|
|
|
|
types = datas[type_column_name].tolist() if type_column_name in datas.columns else ['默认'] * len(links)
|
|
|
|
|
|
|
|
|
|
|
|
# 将链接和类型组合成元组列表
|
|
|
|
|
|
result = list(zip(links, types))
|
|
|
|
|
|
print(result)
|
|
|
|
|
|
|
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from typing import Tuple
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def handle_duplicate_files_advanced(folder_path: str, filename: str) -> Tuple[str, bool]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
增强版:处理文件夹中的同名文件,支持更复杂的场景
|
|
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
|
folder_path: 文件夹路径
|
|
|
|
|
|
filename: 原始文件名
|
|
|
|
|
|
|
|
|
|
|
|
返回:
|
|
|
|
|
|
Tuple[str, bool]: (处理后的文件名, 是否是重命名的)
|
|
|
|
|
|
"""
|
|
|
|
|
|
# 首先处理文件名中的非法字符
|
|
|
|
|
|
filename = safe_filename(filename)
|
|
|
|
|
|
|
|
|
|
|
|
base, ext = os.path.splitext(filename)
|
|
|
|
|
|
target_path = os.path.join(folder_path, filename)
|
|
|
|
|
|
|
|
|
|
|
|
if not os.path.exists(target_path):
|
|
|
|
|
|
return filename, False
|
|
|
|
|
|
|
|
|
|
|
|
existing_files = set(os.listdir(folder_path))
|
|
|
|
|
|
pattern = re.compile(r'^{}(_(\d+))?{}$'.format(re.escape(base), re.escape(ext)))
|
|
|
|
|
|
|
|
|
|
|
|
# 找出所有匹配的文件并提取数字
|
|
|
|
|
|
numbers = []
|
|
|
|
|
|
for f in existing_files:
|
|
|
|
|
|
match = pattern.match(f)
|
|
|
|
|
|
if match:
|
|
|
|
|
|
num = int(match.group(2)) if match.group(2) else 0
|
|
|
|
|
|
numbers.append(num)
|
|
|
|
|
|
|
|
|
|
|
|
next_num = max(numbers) + 1 if numbers else 1
|
|
|
|
|
|
new_filename = f"{base}_{next_num}{ext}"
|
|
|
|
|
|
|
|
|
|
|
|
# 确保新文件名也不存在(处理并发情况)
|
|
|
|
|
|
while new_filename in existing_files:
|
|
|
|
|
|
next_num += 1
|
|
|
|
|
|
new_filename = f"{base}_{next_num}{ext}"
|
|
|
|
|
|
|
|
|
|
|
|
return new_filename, True
|