第一次提交
This commit is contained in:
185
app/utils/file_security.py
Normal file
185
app/utils/file_security.py
Normal file
@@ -0,0 +1,185 @@
|
||||
"""
|
||||
文件上传安全检查工具
|
||||
防止恶意文件上传和攻击
|
||||
"""
|
||||
import os
|
||||
import mimetypes
|
||||
from pathlib import Path
|
||||
from typing import Tuple, Optional
|
||||
import hashlib
|
||||
import uuid
|
||||
from flask import current_app
|
||||
|
||||
|
||||
ALLOWED_EXTENSIONS = {
|
||||
# 图片
|
||||
'png', 'jpg', 'jpeg', 'gif', 'bmp', 'webp',
|
||||
# 文档
|
||||
'pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx', 'txt', 'csv',
|
||||
# 压缩包
|
||||
'zip', 'rar', '7z', 'tar', 'gz',
|
||||
# 其他
|
||||
'json', 'xml'
|
||||
}
|
||||
|
||||
BLOCKED_EXTENSIONS = {
|
||||
# 可执行文件
|
||||
'exe', 'bat', 'cmd', 'com', 'pif', 'scr', 'vbs', 'js', 'jar',
|
||||
# 脚本
|
||||
'sh', 'py', 'php', 'asp', 'aspx', 'jsp',
|
||||
# 系统文件
|
||||
'sys', 'dll', 'so', 'dylib',
|
||||
# 其他危险文件
|
||||
'html', 'htm', 'php', 'asp', 'aspx', 'jsp'
|
||||
}
|
||||
|
||||
|
||||
def get_file_extension(filename: str) -> str:
|
||||
"""获取文件扩展名(小写)"""
|
||||
return Path(filename).suffix.lower().lstrip('.')
|
||||
|
||||
|
||||
def check_file_extension(filename: str) -> Tuple[bool, str]:
|
||||
"""
|
||||
检查文件扩展名
|
||||
:param filename: 文件名
|
||||
:return: (是否允许, 错误消息)
|
||||
"""
|
||||
ext = get_file_extension(filename)
|
||||
|
||||
# 检查是否在阻止列表中
|
||||
if ext in BLOCKED_EXTENSIONS:
|
||||
return False, f"不允许上传.{ext}类型的文件"
|
||||
|
||||
# 检查是否在允许列表中
|
||||
if ext not in ALLOWED_EXTENSIONS:
|
||||
return False, f"不支持的文件类型: .{ext}"
|
||||
|
||||
return True, "文件类型允许"
|
||||
|
||||
|
||||
def check_file_signature(file_path: str, allowed_mimetypes: set) -> Tuple[bool, str]:
|
||||
"""
|
||||
检查文件签名(魔数)
|
||||
:param file_path: 文件路径
|
||||
:param allowed_mimetypes: 允许的MIME类型集合
|
||||
:return: (是否通过, 错误消息)
|
||||
"""
|
||||
try:
|
||||
# 读取文件头部(通常前20字节足够识别文件类型)
|
||||
with open(file_path, 'rb') as f:
|
||||
header = f.read(20)
|
||||
|
||||
# 根据文件头部判断文件类型
|
||||
# 这里只是简单示例,实际应该根据具体文件类型实现
|
||||
if header.startswith(b'\x89PNG\r\n\x1a\n'):
|
||||
mimetype = 'image/png'
|
||||
elif header.startswith(b'\xff\xd8\xff'):
|
||||
mimetype = 'image/jpeg'
|
||||
elif header.startswith(b'GIF87a') or header.startswith(b'GIF89a'):
|
||||
mimetype = 'image/gif'
|
||||
elif header.startswith(b'%PDF'):
|
||||
mimetype = 'application/pdf'
|
||||
elif header.startswith(b'PK'):
|
||||
mimetype = 'application/zip'
|
||||
else:
|
||||
mimetype = mimetypes.guess_type(file_path)[0] or 'application/octet-stream'
|
||||
|
||||
# 检查MIME类型是否允许
|
||||
if mimetype not in allowed_mimetypes:
|
||||
return False, f"文件签名验证失败: 检测到{mimetype},但不在允许列表中"
|
||||
|
||||
return True, "文件签名验证通过"
|
||||
|
||||
except Exception as e:
|
||||
return False, f"文件签名检查失败: {str(e)}"
|
||||
|
||||
|
||||
def generate_safe_filename(original_filename: str) -> str:
|
||||
"""
|
||||
生成安全的文件名
|
||||
:param original_filename: 原始文件名
|
||||
:return: 安全的文件名
|
||||
"""
|
||||
# 获取文件扩展名
|
||||
ext = get_file_extension(original_filename)
|
||||
|
||||
# 生成唯一文件名(使用UUID)
|
||||
unique_name = str(uuid.uuid4())
|
||||
|
||||
# 组合新的文件名
|
||||
safe_filename = f"{unique_name}.{ext}" if ext else unique_name
|
||||
|
||||
return safe_filename
|
||||
|
||||
|
||||
def check_file_size(file_path: str, max_size: int = None) -> Tuple[bool, str]:
|
||||
"""
|
||||
检查文件大小
|
||||
:param file_path: 文件路径
|
||||
:param max_size: 最大大小(字节),None表示使用配置中的值
|
||||
:return: (是否通过, 错误消息)
|
||||
"""
|
||||
if max_size is None:
|
||||
max_size = current_app.config.get('MAX_CONTENT_LENGTH', 50 * 1024 * 1024) # 默认50MB
|
||||
|
||||
try:
|
||||
file_size = os.path.getsize(file_path)
|
||||
if file_size > max_size:
|
||||
size_mb = file_size / (1024 * 1024)
|
||||
max_size_mb = max_size / (1024 * 1024)
|
||||
return False, f"文件大小超出限制: {size_mb:.2f}MB > {max_size_mb:.2f}MB"
|
||||
|
||||
return True, f"文件大小检查通过: {file_size / (1024 * 1024):.2f}MB"
|
||||
|
||||
except Exception as e:
|
||||
return False, f"文件大小检查失败: {str(e)}"
|
||||
|
||||
|
||||
def secure_file_upload(file_storage, upload_folder: str, allowed_mimetypes: set) -> Tuple[bool, str, str]:
|
||||
"""
|
||||
安全的文件上传
|
||||
:param file_storage: Flask的FileStorage对象
|
||||
:param upload_folder: 上传目录
|
||||
:param allowed_mimetypes: 允许的MIME类型集合
|
||||
:return: (是否成功, 消息, 文件路径)
|
||||
"""
|
||||
try:
|
||||
# 检查文件名
|
||||
if not file_storage.filename:
|
||||
return False, "文件名不能为空", ""
|
||||
|
||||
# 检查文件扩展名
|
||||
allowed, msg = check_file_extension(file_storage.filename)
|
||||
if not allowed:
|
||||
return False, msg, ""
|
||||
|
||||
# 生成安全的文件名
|
||||
safe_filename = generate_safe_filename(file_storage.filename)
|
||||
file_path = os.path.join(upload_folder, safe_filename)
|
||||
|
||||
# 确保上传目录存在
|
||||
os.makedirs(upload_folder, exist_ok=True)
|
||||
|
||||
# 保存文件
|
||||
file_storage.save(file_path)
|
||||
|
||||
# 检查文件大小
|
||||
allowed, msg = check_file_size(file_path)
|
||||
if not allowed:
|
||||
# 删除已保存的文件
|
||||
os.remove(file_path)
|
||||
return False, msg, ""
|
||||
|
||||
# 检查文件签名
|
||||
allowed, msg = check_file_signature(file_path, allowed_mimetypes)
|
||||
if not allowed:
|
||||
# 删除已保存的文件
|
||||
os.remove(file_path)
|
||||
return False, msg, ""
|
||||
|
||||
return True, "文件上传成功", file_path
|
||||
|
||||
except Exception as e:
|
||||
current_app.logger.error(f"文件上传失败: {str(e)}")
|
||||
return False, f"文件上传失败: {str(e)}", ""
|
||||
Reference in New Issue
Block a user