172 lines
5.0 KiB
Python
172 lines
5.0 KiB
Python
import json
|
||
|
||
import re
|
||
|
||
import pandas as pd
|
||
import requests
|
||
from config import *
|
||
|
||
|
||
def text_detection(text):
|
||
"""
|
||
百度检验文字是否违规
|
||
:param text:
|
||
:return:
|
||
"""
|
||
url = "https://aip.baidubce.com/rest/2.0/solution/v1/text_censor/v2/user_defined?access_token=" + get_baidu_access_token()
|
||
payload = 'text=' + text
|
||
headers = {
|
||
'Content-Type': 'application/x-www-form-urlencoded',
|
||
'Accept': 'application/json'
|
||
}
|
||
|
||
response = requests.request("POST", url, headers=headers, data=payload)
|
||
content = str(response.text)
|
||
data = json.loads(content)
|
||
print(data)
|
||
|
||
# 安全地获取 conclusion 字段,如果不存在则返回默认值
|
||
conclusion = data.get('conclusion', '合规')
|
||
return conclusion
|
||
|
||
|
||
def get_baidu_access_token():
|
||
"""
|
||
使用 AK,SK 生成鉴权签名(Access Token),百度信息获取
|
||
:return: access_token,或是None(如果错误)
|
||
"""
|
||
API_KEY = CONFIG['Baidu']['api_key']
|
||
SECRET_KEY = CONFIG['Baidu']['secret_key']
|
||
|
||
url = "https://aip.baidubce.com/oauth/2.0/token"
|
||
params = {"grant_type": "client_credentials", "client_id": API_KEY, "client_secret": SECRET_KEY}
|
||
return str(requests.post(url, params=params).json().get("access_token"))
|
||
|
||
|
||
def safe_filename(filename):
|
||
"""
|
||
处理文件名,移除或替换不安全的字符
|
||
"""
|
||
# 替换Windows文件系统中不允许的字符
|
||
invalid_chars = '<>:"/\\|?*'
|
||
for char in invalid_chars:
|
||
filename = filename.replace(char, '_')
|
||
# 去除首尾空格和点
|
||
filename = filename.strip('. ')
|
||
# 如果文件名为空,使用默认名称
|
||
if not filename:
|
||
filename = 'untitled'
|
||
return filename
|
||
|
||
def safe_open_directory(directory_path):
|
||
"""
|
||
安全创建目录,确保路径格式正确并创建所有必要的父目录
|
||
"""
|
||
try:
|
||
# 规范化路径
|
||
directory_path = os.path.normpath(directory_path)
|
||
if not os.path.exists(directory_path):
|
||
os.makedirs(directory_path, exist_ok=True)
|
||
os.chmod(directory_path, 0o777)
|
||
except Exception as e:
|
||
# 打印日志并保存到日志文件中
|
||
logging.error(f"创建目录失败: {e}")
|
||
raise
|
||
|
||
|
||
|
||
def check_keywords_in_text(text):
|
||
"""
|
||
检查文本中是否包含违禁词
|
||
:param text:
|
||
:return:
|
||
"""
|
||
keywords = CONFIG['Keywords']['banned_words'].split(',')
|
||
for keyword in keywords:
|
||
if keyword.strip() in text:
|
||
return True
|
||
return False
|
||
|
||
|
||
def extract_content_until_punctuation(text, punctuations=r'[,。!?;]'):
|
||
"""
|
||
截取一段话中从开始到最近的标点符号的内容。
|
||
|
||
:param text: 输入的文本
|
||
:param punctuations: 标点符号的正则表达式模式,默认为",","。","!","?",";"
|
||
:return: 截取的内容
|
||
"""
|
||
# 使用正则表达式查找标点符号的位置
|
||
match = re.search(punctuations, text)
|
||
|
||
if match:
|
||
# 如果找到标点符号,截取从开始到标点符号之前的部分
|
||
return text[:match.end()].strip()
|
||
else:
|
||
# 如果没有找到标点符号,返回整个文本
|
||
return text.strip()
|
||
|
||
|
||
|
||
# 读取Excel表格链接列和类型列的内容并将内容以元组列表的形式返回
|
||
def read_excel(file_name):
|
||
datas = pd.read_excel(file_name)
|
||
first_column_name = datas.columns[0] # 链接列
|
||
type_column_name = '领域' # 类型列
|
||
|
||
links = datas[first_column_name].tolist()
|
||
# 如果存在类型列就读取,不存在则为默认类型
|
||
types = datas[type_column_name].tolist() if type_column_name in datas.columns else ['默认'] * len(links)
|
||
|
||
# 将链接和类型组合成元组列表
|
||
result = list(zip(links, types))
|
||
print(result)
|
||
|
||
return result
|
||
|
||
|
||
|
||
|
||
from typing import Tuple
|
||
|
||
|
||
def handle_duplicate_files_advanced(folder_path: str, filename: str) -> Tuple[str, bool]:
|
||
"""
|
||
增强版:处理文件夹中的同名文件,支持更复杂的场景
|
||
|
||
参数:
|
||
folder_path: 文件夹路径
|
||
filename: 原始文件名
|
||
|
||
返回:
|
||
Tuple[str, bool]: (处理后的文件名, 是否是重命名的)
|
||
"""
|
||
# 首先处理文件名中的非法字符
|
||
filename = safe_filename(filename)
|
||
|
||
base, ext = os.path.splitext(filename)
|
||
target_path = os.path.join(folder_path, filename)
|
||
|
||
if not os.path.exists(target_path):
|
||
return filename, False
|
||
|
||
existing_files = set(os.listdir(folder_path))
|
||
pattern = re.compile(r'^{}(_(\d+))?{}$'.format(re.escape(base), re.escape(ext)))
|
||
|
||
# 找出所有匹配的文件并提取数字
|
||
numbers = []
|
||
for f in existing_files:
|
||
match = pattern.match(f)
|
||
if match:
|
||
num = int(match.group(2)) if match.group(2) else 0
|
||
numbers.append(num)
|
||
|
||
next_num = max(numbers) + 1 if numbers else 1
|
||
new_filename = f"{base}_{next_num}{ext}"
|
||
|
||
# 确保新文件名也不存在(处理并发情况)
|
||
while new_filename in existing_files:
|
||
next_num += 1
|
||
new_filename = f"{base}_{next_num}{ext}"
|
||
|
||
return new_filename, True |