Kamixitong/app/utils/file_security.py

186 lines
5.6 KiB
Python
Raw Normal View History

2025-12-12 11:35:14 +08:00
"""
文件上传安全检查工具
防止恶意文件上传和攻击
"""
import os
import mimetypes
from pathlib import Path
from typing import Tuple, Optional
import hashlib
import uuid
from flask import current_app
ALLOWED_EXTENSIONS = {
# 图片
'png', 'jpg', 'jpeg', 'gif', 'bmp', 'webp',
# 文档
'pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx', 'txt', 'csv',
# 压缩包
'zip', 'rar', '7z', 'tar', 'gz',
# 其他
'json', 'xml'
}
BLOCKED_EXTENSIONS = {
# 可执行文件
'exe', 'bat', 'cmd', 'com', 'pif', 'scr', 'vbs', 'js', 'jar',
# 脚本
'sh', 'py', 'php', 'asp', 'aspx', 'jsp',
# 系统文件
'sys', 'dll', 'so', 'dylib',
# 其他危险文件
'html', 'htm', 'php', 'asp', 'aspx', 'jsp'
}
def get_file_extension(filename: str) -> str:
"""获取文件扩展名(小写)"""
return Path(filename).suffix.lower().lstrip('.')
def check_file_extension(filename: str) -> Tuple[bool, str]:
"""
检查文件扩展名
:param filename: 文件名
:return: (是否允许, 错误消息)
"""
ext = get_file_extension(filename)
# 检查是否在阻止列表中
if ext in BLOCKED_EXTENSIONS:
return False, f"不允许上传.{ext}类型的文件"
# 检查是否在允许列表中
if ext not in ALLOWED_EXTENSIONS:
return False, f"不支持的文件类型: .{ext}"
return True, "文件类型允许"
def check_file_signature(file_path: str, allowed_mimetypes: set) -> Tuple[bool, str]:
"""
检查文件签名魔数
:param file_path: 文件路径
:param allowed_mimetypes: 允许的MIME类型集合
:return: (是否通过, 错误消息)
"""
try:
# 读取文件头部通常前20字节足够识别文件类型
with open(file_path, 'rb') as f:
header = f.read(20)
# 根据文件头部判断文件类型
# 这里只是简单示例,实际应该根据具体文件类型实现
if header.startswith(b'\x89PNG\r\n\x1a\n'):
mimetype = 'image/png'
elif header.startswith(b'\xff\xd8\xff'):
mimetype = 'image/jpeg'
elif header.startswith(b'GIF87a') or header.startswith(b'GIF89a'):
mimetype = 'image/gif'
elif header.startswith(b'%PDF'):
mimetype = 'application/pdf'
elif header.startswith(b'PK'):
mimetype = 'application/zip'
else:
mimetype = mimetypes.guess_type(file_path)[0] or 'application/octet-stream'
# 检查MIME类型是否允许
if mimetype not in allowed_mimetypes:
return False, f"文件签名验证失败: 检测到{mimetype},但不在允许列表中"
return True, "文件签名验证通过"
except Exception as e:
return False, f"文件签名检查失败: {str(e)}"
def generate_safe_filename(original_filename: str) -> str:
"""
生成安全的文件名
:param original_filename: 原始文件名
:return: 安全的文件名
"""
# 获取文件扩展名
ext = get_file_extension(original_filename)
# 生成唯一文件名使用UUID
unique_name = str(uuid.uuid4())
# 组合新的文件名
safe_filename = f"{unique_name}.{ext}" if ext else unique_name
return safe_filename
def check_file_size(file_path: str, max_size: int = None) -> Tuple[bool, str]:
"""
检查文件大小
:param file_path: 文件路径
:param max_size: 最大大小字节None表示使用配置中的值
:return: (是否通过, 错误消息)
"""
if max_size is None:
max_size = current_app.config.get('MAX_CONTENT_LENGTH', 50 * 1024 * 1024) # 默认50MB
try:
file_size = os.path.getsize(file_path)
if file_size > max_size:
size_mb = file_size / (1024 * 1024)
max_size_mb = max_size / (1024 * 1024)
return False, f"文件大小超出限制: {size_mb:.2f}MB > {max_size_mb:.2f}MB"
return True, f"文件大小检查通过: {file_size / (1024 * 1024):.2f}MB"
except Exception as e:
return False, f"文件大小检查失败: {str(e)}"
def secure_file_upload(file_storage, upload_folder: str, allowed_mimetypes: set) -> Tuple[bool, str, str]:
"""
安全的文件上传
:param file_storage: Flask的FileStorage对象
:param upload_folder: 上传目录
:param allowed_mimetypes: 允许的MIME类型集合
:return: (是否成功, 消息, 文件路径)
"""
try:
# 检查文件名
if not file_storage.filename:
return False, "文件名不能为空", ""
# 检查文件扩展名
allowed, msg = check_file_extension(file_storage.filename)
if not allowed:
return False, msg, ""
# 生成安全的文件名
safe_filename = generate_safe_filename(file_storage.filename)
file_path = os.path.join(upload_folder, safe_filename)
# 确保上传目录存在
os.makedirs(upload_folder, exist_ok=True)
# 保存文件
file_storage.save(file_path)
# 检查文件大小
allowed, msg = check_file_size(file_path)
if not allowed:
# 删除已保存的文件
os.remove(file_path)
return False, msg, ""
# 检查文件签名
allowed, msg = check_file_signature(file_path, allowed_mimetypes)
if not allowed:
# 删除已保存的文件
os.remove(file_path)
return False, msg, ""
return True, "文件上传成功", file_path
except Exception as e:
current_app.logger.error(f"文件上传失败: {str(e)}")
return False, f"文件上传失败: {str(e)}", ""