from datetime import datetime from werkzeug.security import generate_password_hash, check_password_hash from flask_login import UserMixin from app import db, login_manager import hashlib import hmac import base64 class Admin(UserMixin, db.Model): """管理员模型""" __tablename__ = 'admin' admin_id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(32), unique=True, nullable=False, index=True) password_hash = db.Column(db.String(128), nullable=False) email = db.Column(db.String(64), nullable=True) role = db.Column(db.Integer, nullable=False, default=0) # 0=普通管理员, 1=超级管理员 status = db.Column(db.Integer, nullable=False, default=1) # 0=禁用, 1=正常 is_deleted = db.Column(db.Integer, nullable=False, default=0) # 0=未删除, 1=已删除(软删除) create_time = db.Column(db.DateTime, default=datetime.utcnow) update_time = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) delete_time = db.Column(db.DateTime, nullable=True) # 软删除时间 last_login_time = db.Column(db.DateTime, nullable=True) last_login_ip = db.Column(db.String(32), nullable=True) def __init__(self, **kwargs): super(Admin, self).__init__(**kwargs) if self.password_hash is None and 'password' in kwargs: self.set_password(kwargs['password']) def set_password(self, password): """设置密码""" self.password_hash = generate_password_hash(password) def _manual_pbkdf2_verify(self, password_hash, password): """手动实现 PBKDF2 密码验证,不依赖 Werkzeug 的内部实现""" import logging try: # 确保密码是字节 if isinstance(password, str): password_bytes = password.encode('utf-8') else: password_bytes = password # 解析密码哈希格式 iterations = None salt_bytes = None stored_hash_bytes = None # 格式1: pbkdf2:sha256:iterations$salt$hash (Werkzeug 2.3.7+) if password_hash.startswith('pbkdf2:'): parts = password_hash.split('$') if len(parts) == 3: method_part = parts[0] # pbkdf2:sha256:iterations method_parts = method_part.split(':') if len(method_parts) == 3: iterations = int(method_parts[2]) salt_str = parts[1] hash_str = parts[2] # 新格式使用十六进制编码 try: salt_bytes = bytes.fromhex(salt_str) stored_hash_bytes = bytes.fromhex(hash_str) logging.debug(f"新格式解码成功 - salt长度: {len(salt_bytes)}, hash长度: {len(stored_hash_bytes)}") except ValueError: # 如果不是十六进制,尝试 base64 try: salt_bytes = base64.b64decode(salt_str + '==') stored_hash_bytes = base64.b64decode(hash_str + '==') logging.debug(f"新格式base64解码成功") except Exception as e: logging.error(f"新格式解码失败: {e}") return False # 格式2: $pbkdf2-sha256$iterations$salt$hash (旧格式) elif password_hash.startswith('$pbkdf2-sha256$'): parts = password_hash.split('$') if len(parts) != 5: logging.error(f"旧格式解析失败 - 部分数量: {len(parts)}") return False try: iterations = int(parts[2]) salt_str = parts[3] hash_str = parts[4] logging.debug(f"旧格式解析 - iterations: {iterations}, salt长度: {len(salt_str)}, hash长度: {len(hash_str)}") except (ValueError, IndexError) as e: logging.error(f"旧格式解析失败: {e}") return False # 旧格式使用 base64 编码,但可能包含点号 def decode_base64_safe(s, name="unknown"): """安全解码 base64,处理各种变体""" if not s: logging.warning(f"{name} 为空") return None # 尝试多种解码方式(根据测试,点号替换为+最可能正确) variants = [ (s.replace('.', '+'), "点号替换为+"), # 优先尝试这个 (s, "原始"), (s.replace('.', '/'), "点号替换为/"), (s.replace('.', ''), "移除点号"), (s.replace('.', '='), "点号替换为="), ] for variant, desc in variants: try: # 移除现有填充 variant_clean = variant.rstrip('=') # 添加正确的填充 missing = len(variant_clean) % 4 if missing: variant_clean += '=' * (4 - missing) # 尝试解码 decoded = base64.b64decode(variant_clean, validate=False) if decoded and len(decoded) > 0: logging.debug(f"{name} 解码成功 - 方法: {desc}, 长度: {len(decoded)}") return decoded except Exception as e: logging.debug(f"{name} 解码失败 - 方法: {desc}, 错误: {e}") continue logging.error(f"{name} 所有解码方法都失败 - 字符串: {s[:30]}...") return None salt_bytes = decode_base64_safe(salt_str, "salt") stored_hash_bytes = decode_base64_safe(hash_str, "hash") else: logging.error(f"不支持的密码哈希格式: {password_hash[:30]}...") return False # 检查是否成功解析 if not iterations: logging.error("无法解析迭代次数") return False if not salt_bytes: logging.error("无法解码 salt") return False if not stored_hash_bytes: logging.error("无法解码 hash") return False logging.debug(f"开始计算 PBKDF2 - iterations: {iterations}, salt长度: {len(salt_bytes)}, hash长度: {len(stored_hash_bytes)}") # 使用 PBKDF2 计算哈希 computed_hash = hashlib.pbkdf2_hmac('sha256', password_bytes, salt_bytes, iterations) logging.debug(f"PBKDF2 计算完成 - 计算长度: {len(computed_hash)}, 存储长度: {len(stored_hash_bytes)}") # 安全比较 result = hmac.compare_digest(computed_hash, stored_hash_bytes) logging.info(f"手动 PBKDF2 验证结果: {result}") return result except Exception as e: logging.error(f"手动 PBKDF2 验证异常: {str(e)}", exc_info=True) return False def check_password(self, password): """验证密码 - 重构版本,更简单可靠""" import logging try: # 基本验证 if not self.password_hash: logging.error(f"密码哈希为空 - 用户ID: {self.admin_id}, 用户名: {self.username}") return False if not password: logging.warning(f"输入的密码为空 - 用户ID: {self.admin_id}, 用户名: {self.username}") return False # 确保密码是字符串类型 if not isinstance(password, str): password = str(password) # 方法1: 首先尝试使用 Werkzeug 的标准方法 try: result = check_password_hash(self.password_hash, password) if result: # 验证成功,如果使用旧格式,自动迁移到新格式 if self.password_hash.startswith('$pbkdf2-sha256$'): try: new_hash = generate_password_hash(password, method='pbkdf2:sha256') self.password_hash = new_hash db.session.commit() logging.info(f"密码哈希已自动迁移到新格式 - 用户ID: {self.admin_id}, 用户名: {self.username}") except Exception as migrate_error: logging.warning(f"密码哈希迁移失败: {str(migrate_error)}") return True except (TypeError, ValueError, AttributeError) as e: # Werkzeug 标准方法失败,使用手动验证 logging.warning(f"Werkzeug 标准验证失败,使用手动验证: {str(e)}") # 方法2: 使用手动 PBKDF2 验证 result = self._manual_pbkdf2_verify(self.password_hash, password) if result: # 验证成功,迁移到新格式 try: new_hash = generate_password_hash(password, method='pbkdf2:sha256') self.password_hash = new_hash db.session.commit() logging.info(f"密码验证成功,已迁移到新格式 - 用户ID: {self.admin_id}, 用户名: {self.username}") except Exception as migrate_error: logging.warning(f"密码哈希迁移失败: {str(migrate_error)}") else: # 如果手动验证也失败,记录详细信息 logging.warning(f"密码验证失败 - 用户ID: {self.admin_id}, 用户名: {self.username}, 哈希格式: {self.password_hash[:50]}...") # 如果是旧格式且验证失败,可能是密码哈希损坏,建议重置密码 if self.password_hash.startswith('$pbkdf2-sha256$'): logging.warning(f"旧格式密码哈希验证失败,建议重置密码 - 用户ID: {self.admin_id}, 用户名: {self.username}") return result except Exception as e: logging.error(f"密码验证异常 - 用户ID: {self.admin_id}, 用户名: {self.username}, 错误: {str(e)}", exc_info=True) return False def is_super_admin(self): """是否为超级管理员""" return self.role == 1 @property def is_active(self): """账号是否激活""" return self.status == 1 def update_last_login(self, ip_address): """更新最后登录信息""" self.last_login_time = datetime.utcnow() self.last_login_ip = ip_address db.session.commit() def get_id(self): """Flask-Login 需要的方法,返回用户唯一标识""" return str(self.admin_id) # 为了兼容性,也可以添加 id 属性 @property def id(self): """Flask-Login 需要的 id 属性""" return self.admin_id def to_dict(self): """转换为字典""" return { 'admin_id': self.admin_id, 'username': self.username, 'email': self.email, 'role': self.role, 'role_name': '超级管理员' if self.role == 1 else '普通管理员', 'status': self.status, 'status_name': '正常' if self.status == 1 else '禁用', 'is_deleted': self.is_deleted, 'create_time': self.create_time.strftime('%Y-%m-%d %H:%M:%S') if self.create_time else None, 'update_time': self.update_time.strftime('%Y-%m-%d %H:%M:%S') if self.update_time else None, 'delete_time': self.delete_time.strftime('%Y-%m-%d %H:%M:%S') if self.delete_time else None, 'last_login_time': self.last_login_time.strftime('%Y-%m-%d %H:%M:%S') if self.last_login_time else None, 'last_login_ip': self.last_login_ip } def soft_delete(self): """软删除""" self.is_deleted = 1 self.delete_time = datetime.utcnow() @staticmethod def get_query(): """获取未删除的查询""" return Admin.query.filter(Admin.is_deleted == 0) def __repr__(self): return f'' @login_manager.user_loader def load_user(admin_id): """Flask-Login用户加载器""" try: # 只加载未删除且激活的用户 admin = Admin.query.filter_by(admin_id=int(admin_id), is_deleted=0).first() return admin except (TypeError, ValueError): return None