from datetime import datetime from werkzeug.security import generate_password_hash, check_password_hash from flask_login import UserMixin from app import db, login_manager import hashlib import hmac import base64 class Admin(UserMixin, db.Model): """管理员模型""" __tablename__ = 'admin' admin_id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(32), unique=True, nullable=False, index=True) password_hash = db.Column(db.String(256), nullable=False) email = db.Column(db.String(64), nullable=True) role = db.Column(db.Integer, nullable=False, default=0) # 0=普通管理员, 1=超级管理员 status = db.Column(db.Integer, nullable=False, default=1) # 0=禁用, 1=正常 is_deleted = db.Column(db.Integer, nullable=False, default=0) # 0=未删除, 1=已删除(软删除) create_time = db.Column(db.DateTime, default=datetime.utcnow) update_time = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) delete_time = db.Column(db.DateTime, nullable=True) # 软删除时间 last_login_time = db.Column(db.DateTime, nullable=True) last_login_ip = db.Column(db.String(32), nullable=True) def __init__(self, **kwargs): super(Admin, self).__init__(**kwargs) if self.password_hash is None and 'password' in kwargs: self.set_password(kwargs['password']) def set_password(self, password): """设置密码""" # 使用 pbkdf2:sha256 算法,确保哈希长度在合理范围内 self.password_hash = generate_password_hash(password, method='pbkdf2:sha256') def check_password(self, password): """验证密码 - 简化版本,更可靠""" import logging try: # 基本验证 if not self.password_hash: logging.error(f"密码哈希为空 - 用户ID: {self.admin_id}, 用户名: {self.username}") return False if not password: logging.warning(f"输入的密码为空 - 用户ID: {self.admin_id}, 用户名: {self.username}") return False # 确保密码是字符串类型 if not isinstance(password, str): password = str(password) # 使用 Werkzeug 的标准方法验证密码 result = check_password_hash(self.password_hash, password) if result: logging.debug(f"密码验证成功 - 用户ID: {self.admin_id}, 用户名: {self.username}") return True else: logging.warning(f"密码验证失败 - 用户ID: {self.admin_id}, 用户名: {self.username}") return False except Exception as e: logging.error(f"密码验证异常 - 用户ID: {self.admin_id}, 用户名: {self.username}, 错误: {str(e)}", exc_info=True) return False def is_super_admin(self): """是否为超级管理员""" return self.role == 1 @property def is_active(self): """账号是否激活""" return self.status == 1 def update_last_login(self, ip_address): """更新最后登录信息""" self.last_login_time = datetime.utcnow() self.last_login_ip = ip_address db.session.commit() def get_id(self): """Flask-Login 需要的方法,返回用户唯一标识""" return str(self.admin_id) # 为了兼容性,也可以添加 id 属性 @property def id(self): """Flask-Login 需要的 id 属性""" return self.admin_id def to_dict(self): """转换为字典""" return { 'admin_id': self.admin_id, 'username': self.username, 'email': self.email, 'role': self.role, 'role_name': '超级管理员' if self.role == 1 else '普通管理员', 'status': self.status, 'status_name': '正常' if self.status == 1 else '禁用', 'is_deleted': self.is_deleted, 'create_time': self.create_time.strftime('%Y-%m-%d %H:%M:%S') if self.create_time else None, 'update_time': self.update_time.strftime('%Y-%m-%d %H:%M:%S') if self.update_time else None, 'delete_time': self.delete_time.strftime('%Y-%m-%d %H:%M:%S') if self.delete_time else None, 'last_login_time': self.last_login_time.strftime('%Y-%m-%d %H:%M:%S') if self.last_login_time else None, 'last_login_ip': self.last_login_ip } def soft_delete(self): """软删除""" self.is_deleted = 1 self.delete_time = datetime.utcnow() @staticmethod def get_query(): """获取未删除的查询""" return Admin.query.filter(Admin.is_deleted == 0) def __repr__(self): return f'' @login_manager.user_loader def load_user(admin_id): """Flask-Login用户加载器""" try: # 只加载未删除且激活的用户 admin = Admin.query.filter_by(admin_id=int(admin_id), is_deleted=0).first() return admin except (TypeError, ValueError): return None