""" 安全相关工具 - 使用scrypt替代bcrypt """ from datetime import datetime, timedelta from typing import Any, Union, Optional from jose import jwt from passlib.context import CryptContext from .config import settings # 使用scrypt替代bcrypt pwd_context = CryptContext(schemes=["scrypt"], deprecated="auto") # 获取安全配置 security_settings = settings.security def create_access_token( subject: Union[str, Any], expires_delta: Optional[timedelta] = None ) -> str: """ 创建访问令牌 """ if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta( minutes=security_settings.ACCESS_TOKEN_EXPIRE_MINUTES ) to_encode = {"exp": expire, "sub": str(subject)} encoded_jwt = jwt.encode( to_encode, security_settings.SECRET_KEY, algorithm=security_settings.ALGORITHM ) return encoded_jwt def verify_password(plain_password: str, hashed_password: str) -> bool: """ 验证密码 """ # 为避免bcrypt的72字节限制,对密码进行截断处理 if len(plain_password.encode('utf-8')) > 72: plain_password = plain_password.encode('utf-8')[:72].decode('utf-8', errors='ignore') return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """ 获取密码哈希 """ # 为避免bcrypt的72字节限制,对密码进行截断处理 if len(password.encode('utf-8')) > 72: password = password.encode('utf-8')[:72].decode('utf-8', errors='ignore') return pwd_context.hash(password) def verify_token(token: str) -> Optional[str]: """ 验证令牌并返回用户ID """ try: payload = jwt.decode( token, security_settings.SECRET_KEY, algorithms=[security_settings.ALGORITHM] ) user_id: str = payload.get("sub") if user_id is None: return None return user_id except jwt.JWTError: return None