Kamixitong/app/models/admin.py
2025-11-22 20:32:49 +08:00

292 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime
from werkzeug.security import generate_password_hash, check_password_hash
from flask_login import UserMixin
from app import db, login_manager
import hashlib
import hmac
import base64
class Admin(UserMixin, db.Model):
"""管理员模型"""
__tablename__ = 'admin'
admin_id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(32), unique=True, nullable=False, index=True)
password_hash = db.Column(db.String(128), nullable=False)
email = db.Column(db.String(64), nullable=True)
role = db.Column(db.Integer, nullable=False, default=0) # 0=普通管理员, 1=超级管理员
status = db.Column(db.Integer, nullable=False, default=1) # 0=禁用, 1=正常
is_deleted = db.Column(db.Integer, nullable=False, default=0) # 0=未删除, 1=已删除(软删除)
create_time = db.Column(db.DateTime, default=datetime.utcnow)
update_time = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
delete_time = db.Column(db.DateTime, nullable=True) # 软删除时间
last_login_time = db.Column(db.DateTime, nullable=True)
last_login_ip = db.Column(db.String(32), nullable=True)
def __init__(self, **kwargs):
super(Admin, self).__init__(**kwargs)
if self.password_hash is None and 'password' in kwargs:
self.set_password(kwargs['password'])
def set_password(self, password):
"""设置密码"""
self.password_hash = generate_password_hash(password)
def _manual_pbkdf2_verify(self, password_hash, password):
"""手动实现 PBKDF2 密码验证,不依赖 Werkzeug 的内部实现"""
import logging
try:
# 确保密码是字节
if isinstance(password, str):
password_bytes = password.encode('utf-8')
else:
password_bytes = password
# 解析密码哈希格式
iterations = None
salt_bytes = None
stored_hash_bytes = None
# 格式1: pbkdf2:sha256:iterations$salt$hash (Werkzeug 2.3.7+)
if password_hash.startswith('pbkdf2:'):
parts = password_hash.split('$')
if len(parts) == 3:
method_part = parts[0] # pbkdf2:sha256:iterations
method_parts = method_part.split(':')
if len(method_parts) == 3:
iterations = int(method_parts[2])
salt_str = parts[1]
hash_str = parts[2]
# 新格式使用十六进制编码
try:
salt_bytes = bytes.fromhex(salt_str)
stored_hash_bytes = bytes.fromhex(hash_str)
logging.debug(f"新格式解码成功 - salt长度: {len(salt_bytes)}, hash长度: {len(stored_hash_bytes)}")
except ValueError:
# 如果不是十六进制,尝试 base64
try:
salt_bytes = base64.b64decode(salt_str + '==')
stored_hash_bytes = base64.b64decode(hash_str + '==')
logging.debug(f"新格式base64解码成功")
except Exception as e:
logging.error(f"新格式解码失败: {e}")
return False
# 格式2: $pbkdf2-sha256$iterations$salt$hash (旧格式)
elif password_hash.startswith('$pbkdf2-sha256$'):
parts = password_hash.split('$')
if len(parts) != 5:
logging.error(f"旧格式解析失败 - 部分数量: {len(parts)}")
return False
try:
iterations = int(parts[2])
salt_str = parts[3]
hash_str = parts[4]
logging.debug(f"旧格式解析 - iterations: {iterations}, salt长度: {len(salt_str)}, hash长度: {len(hash_str)}")
except (ValueError, IndexError) as e:
logging.error(f"旧格式解析失败: {e}")
return False
# 旧格式使用 base64 编码,但可能包含点号
def decode_base64_safe(s, name="unknown"):
"""安全解码 base64处理各种变体"""
if not s:
logging.warning(f"{name} 为空")
return None
# 尝试多种解码方式(根据测试,点号替换为+最可能正确)
variants = [
(s.replace('.', '+'), "点号替换为+"), # 优先尝试这个
(s, "原始"),
(s.replace('.', '/'), "点号替换为/"),
(s.replace('.', ''), "移除点号"),
(s.replace('.', '='), "点号替换为="),
]
for variant, desc in variants:
try:
# 移除现有填充
variant_clean = variant.rstrip('=')
# 添加正确的填充
missing = len(variant_clean) % 4
if missing:
variant_clean += '=' * (4 - missing)
# 尝试解码
decoded = base64.b64decode(variant_clean, validate=False)
if decoded and len(decoded) > 0:
logging.debug(f"{name} 解码成功 - 方法: {desc}, 长度: {len(decoded)}")
return decoded
except Exception as e:
logging.debug(f"{name} 解码失败 - 方法: {desc}, 错误: {e}")
continue
logging.error(f"{name} 所有解码方法都失败 - 字符串: {s[:30]}...")
return None
salt_bytes = decode_base64_safe(salt_str, "salt")
stored_hash_bytes = decode_base64_safe(hash_str, "hash")
else:
logging.error(f"不支持的密码哈希格式: {password_hash[:30]}...")
return False
# 检查是否成功解析
if not iterations:
logging.error("无法解析迭代次数")
return False
if not salt_bytes:
logging.error("无法解码 salt")
return False
if not stored_hash_bytes:
logging.error("无法解码 hash")
return False
logging.debug(f"开始计算 PBKDF2 - iterations: {iterations}, salt长度: {len(salt_bytes)}, hash长度: {len(stored_hash_bytes)}")
# 使用 PBKDF2 计算哈希
computed_hash = hashlib.pbkdf2_hmac('sha256', password_bytes, salt_bytes, iterations)
logging.debug(f"PBKDF2 计算完成 - 计算长度: {len(computed_hash)}, 存储长度: {len(stored_hash_bytes)}")
# 安全比较
result = hmac.compare_digest(computed_hash, stored_hash_bytes)
logging.info(f"手动 PBKDF2 验证结果: {result}")
return result
except Exception as e:
logging.error(f"手动 PBKDF2 验证异常: {str(e)}", exc_info=True)
return False
def check_password(self, password):
"""验证密码 - 重构版本,更简单可靠"""
import logging
try:
# 基本验证
if not self.password_hash:
logging.error(f"密码哈希为空 - 用户ID: {self.admin_id}, 用户名: {self.username}")
return False
if not password:
logging.warning(f"输入的密码为空 - 用户ID: {self.admin_id}, 用户名: {self.username}")
return False
# 确保密码是字符串类型
if not isinstance(password, str):
password = str(password)
# 方法1: 首先尝试使用 Werkzeug 的标准方法
try:
result = check_password_hash(self.password_hash, password)
if result:
# 验证成功,如果使用旧格式,自动迁移到新格式
if self.password_hash.startswith('$pbkdf2-sha256$'):
try:
new_hash = generate_password_hash(password, method='pbkdf2:sha256')
self.password_hash = new_hash
db.session.commit()
logging.info(f"密码哈希已自动迁移到新格式 - 用户ID: {self.admin_id}, 用户名: {self.username}")
except Exception as migrate_error:
logging.warning(f"密码哈希迁移失败: {str(migrate_error)}")
return True
except (TypeError, ValueError, AttributeError) as e:
# Werkzeug 标准方法失败,使用手动验证
logging.warning(f"Werkzeug 标准验证失败,使用手动验证: {str(e)}")
# 方法2: 使用手动 PBKDF2 验证
result = self._manual_pbkdf2_verify(self.password_hash, password)
if result:
# 验证成功,迁移到新格式
try:
new_hash = generate_password_hash(password, method='pbkdf2:sha256')
self.password_hash = new_hash
db.session.commit()
logging.info(f"密码验证成功,已迁移到新格式 - 用户ID: {self.admin_id}, 用户名: {self.username}")
except Exception as migrate_error:
logging.warning(f"密码哈希迁移失败: {str(migrate_error)}")
else:
# 如果手动验证也失败,记录详细信息
logging.warning(f"密码验证失败 - 用户ID: {self.admin_id}, 用户名: {self.username}, 哈希格式: {self.password_hash[:50]}...")
# 如果是旧格式且验证失败,可能是密码哈希损坏,建议重置密码
if self.password_hash.startswith('$pbkdf2-sha256$'):
logging.warning(f"旧格式密码哈希验证失败,建议重置密码 - 用户ID: {self.admin_id}, 用户名: {self.username}")
return result
except Exception as e:
logging.error(f"密码验证异常 - 用户ID: {self.admin_id}, 用户名: {self.username}, 错误: {str(e)}", exc_info=True)
return False
def is_super_admin(self):
"""是否为超级管理员"""
return self.role == 1
@property
def is_active(self):
"""账号是否激活"""
return self.status == 1
def update_last_login(self, ip_address):
"""更新最后登录信息"""
self.last_login_time = datetime.utcnow()
self.last_login_ip = ip_address
db.session.commit()
def get_id(self):
"""Flask-Login 需要的方法,返回用户唯一标识"""
return str(self.admin_id)
# 为了兼容性,也可以添加 id 属性
@property
def id(self):
"""Flask-Login 需要的 id 属性"""
return self.admin_id
def to_dict(self):
"""转换为字典"""
return {
'admin_id': self.admin_id,
'username': self.username,
'email': self.email,
'role': self.role,
'role_name': '超级管理员' if self.role == 1 else '普通管理员',
'status': self.status,
'status_name': '正常' if self.status == 1 else '禁用',
'is_deleted': self.is_deleted,
'create_time': self.create_time.strftime('%Y-%m-%d %H:%M:%S') if self.create_time else None,
'update_time': self.update_time.strftime('%Y-%m-%d %H:%M:%S') if self.update_time else None,
'delete_time': self.delete_time.strftime('%Y-%m-%d %H:%M:%S') if self.delete_time else None,
'last_login_time': self.last_login_time.strftime('%Y-%m-%d %H:%M:%S') if self.last_login_time else None,
'last_login_ip': self.last_login_ip
}
def soft_delete(self):
"""软删除"""
self.is_deleted = 1
self.delete_time = datetime.utcnow()
@staticmethod
def get_query():
"""获取未删除的查询"""
return Admin.query.filter(Admin.is_deleted == 0)
def __repr__(self):
return f'<Admin {self.username}>'
@login_manager.user_loader
def load_user(admin_id):
"""Flask-Login用户加载器"""
try:
# 只加载未删除且激活的用户
admin = Admin.query.filter_by(admin_id=int(admin_id), is_deleted=0).first()
return admin
except (TypeError, ValueError):
return None