Files
KaMixitong/app/api/auth.py
2026-03-25 15:24:22 +08:00

578 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from flask import request, jsonify, current_app
from datetime import datetime, timedelta
from app import db
from app.models import Product, License, Device, Version
from app.utils.crypto import generate_hash, verify_hash, generate_signature
from app.middleware.rate_limit import rate_limit, ip_key
from . import api_bp
@api_bp.route('/auth/verify', methods=['POST'])
@rate_limit(limit=30, window=60, key_func=ip_key)
def verify_license():
"""验证卡密接口"""
try:
# 检查Content-Type
content_type = request.content_type
if content_type and 'application/json' not in content_type:
current_app.logger.warning(f"验证请求Content-Type不正确 - {content_type}")
data = request.get_json(force=True) # 强制解析JSON即使Content-Type不正确
if not data:
# 尝试获取原始数据用于调试
raw_data = request.get_data(as_text=True)
current_app.logger.warning(f"验证请求:请求数据为空或无法解析 - Content-Type: {content_type}, 原始数据: {raw_data[:200] if raw_data else 'None'}")
return jsonify({
'success': False,
'message': '请求数据为空或格式错误'
}), 400
# 获取请求参数
software_id = data.get('software_id')
license_key = data.get('license_key')
machine_code = data.get('machine_code')
timestamp = data.get('timestamp')
signature = data.get('signature')
# 验证必填参数,并记录缺失的参数
missing_params = []
if not software_id:
missing_params.append('software_id')
if not license_key:
missing_params.append('license_key')
if not machine_code:
missing_params.append('machine_code')
if timestamp is None:
missing_params.append('timestamp')
if not signature:
missing_params.append('signature')
if missing_params:
current_app.logger.warning(f"验证请求:缺少必要参数 - {', '.join(missing_params)}")
return jsonify({
'success': False,
'message': f'缺少必要参数: {", ".join(missing_params)}'
}), 400
# 验证时间戳(防止重放攻击)
try:
# 安全地转换timestamp为整数
if isinstance(timestamp, (str, float)):
timestamp = int(float(timestamp))
elif not isinstance(timestamp, int):
raise ValueError(f"timestamp必须是数字类型实际是: {type(timestamp).__name__}")
# 使用timestamp.fromtimestamp确保正确处理
request_time = datetime.fromtimestamp(timestamp, tz=None)
# 使用当前UTC时间进行比较
current_time = datetime.utcnow()
time_diff = abs((current_time - request_time).total_seconds())
current_app.logger.debug(f"时间戳验证: 请求时间={request_time}, 当前时间={current_time}, 差值={time_diff:.1f}")
if time_diff > 300: # 5分钟有效期
current_app.logger.warning(f"验证请求:请求已过期 - 时间差: {time_diff:.1f}")
return jsonify({
'success': False,
'message': f'请求已过期(时间差: {time_diff:.0f}秒)'
}), 400
except (ValueError, TypeError, OSError) as e:
current_app.logger.error(f"验证请求:时间戳格式错误 - timestamp: {timestamp} ({type(timestamp).__name__}), 错误: {str(e)}")
return jsonify({
'success': False,
'message': f'时间戳格式错误: {str(e)}'
}), 400
# 验证签名
secret_key = current_app.config.get('AUTH_SECRET_KEY')
if not secret_key:
current_app.logger.error("AUTH_SECRET_KEY未配置")
return jsonify({
'success': False,
'message': '服务器配置错误:密钥未设置'
}), 500
signature_data = f"{software_id}{license_key}{machine_code}{timestamp}"
expected_signature = generate_signature(signature_data, secret_key)
# 记录请求信息(不记录敏感信息)
current_app.logger.debug(f"验证请求 - software_id: {software_id}, machine_code: {machine_code[:8]}..., timestamp: {timestamp}")
# 简化签名验证日志,避免泄露敏感信息
if signature != expected_signature:
current_app.logger.error("签名验证失败")
return jsonify({
'success': False,
'message': '签名验证失败,请检查密钥配置'
}), 401
# 查找产品
product = Product.query.filter_by(product_id=software_id).first()
if not product:
return jsonify({
'success': False,
'message': '产品不存在'
}), 404
if not product.is_enabled():
return jsonify({
'success': False,
'message': '产品已禁用'
}), 403
# 查找卡密
license_obj = License.query.filter_by(
license_key=license_key,
product_id=software_id
).first()
if not license_obj:
return jsonify({
'success': False,
'message': '卡密不存在'
}), 404
# 验证卡密
software_version = data.get('software_version', '1.0.0')
success, message = license_obj.verify(machine_code, software_version)
if not success:
return jsonify({
'success': False,
'message': message
}), 403
# 处理首次激活
if license_obj.status == 0: # 未激活
software_version = data.get('software_version', '1.0.0')
success, message = license_obj.activate(
machine_code,
software_version
)
if not success:
return jsonify({
'success': False,
'message': message
}), 403
db.session.commit()
# 更新设备验证时间
device = Device.query.filter_by(machine_code=machine_code).first()
if device:
device.update_verify_time()
db.session.commit()
# 获取最新版本信息
latest_version = Version.query.filter_by(
product_id=software_id,
publish_status=1
).order_by(Version.update_time.desc(), Version.create_time.desc()).first()
response_data = {
'license_key': license_obj.license_key,
'type': license_obj.type,
'type_name': '试用' if license_obj.is_trial() else '正式',
'expire_time': license_obj.expire_time.isoformat() if license_obj.expire_time else None,
'remaining_days': license_obj.get_remaining_days(),
'product_name': product.product_name,
'activate_time': license_obj.activate_time.isoformat() if license_obj.activate_time else None
}
# 添加版本信息
if latest_version:
current_version = data.get('software_version', '1.0.0')
response_data.update({
'new_version': latest_version.version_num,
'download_url': latest_version.download_url,
'force_update': latest_version.is_force_update(),
'update_log': latest_version.update_log,
'need_update': latest_version.version_num != current_version
})
return jsonify({
'success': True,
'message': '验证成功',
'data': response_data
})
except Exception as e:
# 记录详细的错误信息,包括堆栈跟踪
import traceback
error_trace = traceback.format_exc()
# 安全地记录错误
try:
current_app.logger.error(f"卡密验证失败: {str(e)}\n{error_trace}")
except:
# 如果无法记录到应用日志,至少打印到控制台
print(f"卡密验证失败: {str(e)}\n{error_trace}")
# 尝试回滚数据库事务
try:
db.session.rollback()
except:
pass
# 检查是否是数据库连接错误
error_str = str(e).lower()
error_type = type(e).__name__
# 更详细的错误分类
if 'operationalerror' in error_str or 'connection' in error_str or 'database' in error_str or 'OperationalError' in error_type:
return jsonify({
'success': False,
'message': '数据库连接失败,请稍后重试'
}), 503
elif 'timeout' in error_str or 'Timeout' in error_type:
return jsonify({
'success': False,
'message': '请求处理超时,请稍后重试'
}), 503
elif 'attributeerror' in error_str:
return jsonify({
'success': False,
'message': '数据关联错误,请联系管理员'
}), 500
elif 'keyerror' in error_str:
return jsonify({
'success': False,
'message': '数据缺失错误,请联系管理员'
}), 500
elif 'validation' in error_str or 'validate' in error_str:
return jsonify({
'success': False,
'message': '数据验证失败,请检查输入参数'
}), 400
else:
# 对于未知错误,返回 500 状态码但隐藏技术细节
return jsonify({
'success': False,
'message': '服务器内部错误'
}), 500
@api_bp.route('/auth/activate', methods=['POST'])
def activate_license():
"""激活卡密接口"""
try:
data = request.get_json()
if not data:
return jsonify({
'success': False,
'message': '请求数据为空'
}), 400
software_id = data.get('software_id')
license_key = data.get('license_key')
machine_code = data.get('machine_code')
software_version = data.get('software_version', '1.0.0')
# 验证必填参数
if not all([software_id, license_key, machine_code]):
return jsonify({
'success': False,
'message': '缺少必要参数'
}), 400
# 查找产品
product = Product.query.filter_by(product_id=software_id).first()
if not product:
return jsonify({
'success': False,
'message': '产品不存在'
}), 404
# 查找卡密
license_obj = License.query.filter_by(
license_key=license_key,
product_id=software_id
).first()
if not license_obj:
return jsonify({
'success': False,
'message': '卡密不存在'
}), 404
# 激活卡密
success, message = license_obj.activate(machine_code, software_version)
if not success:
return jsonify({
'success': False,
'message': message
}), 400
db.session.commit()
return jsonify({
'success': True,
'message': '激活成功',
'data': license_obj.to_dict()
})
except Exception as e:
current_app.logger.error(f"卡密激活失败: {str(e)}")
return jsonify({
'success': False,
'message': '服务器内部错误'
}), 500
@api_bp.route('/auth/info', methods=['GET'])
def get_auth_info():
"""获取授权信息接口"""
try:
software_id = request.args.get('software_id')
machine_code = request.args.get('machine_code')
if not software_id or not machine_code:
return jsonify({
'success': False,
'message': '缺少必要参数'
}), 400
# 查找设备
device = Device.query.filter_by(
machine_code=machine_code,
status=1
).first()
if not device:
return jsonify({
'success': False,
'message': '设备未激活'
}), 404
# 查找关联的卡密
license_obj = device.license
if not license_obj or license_obj.product_id != software_id:
return jsonify({
'success': False,
'message': '授权信息不匹配'
}), 403
# 检查授权状态
if license_obj.is_expired():
return jsonify({
'success': False,
'message': '授权已过期'
}), 403
# 返回授权信息
return jsonify({
'success': True,
'data': {
'license_key': license_obj.license_key,
'type': license_obj.type,
'type_name': '试用' if license_obj.is_trial() else '正式',
'expire_time': license_obj.expire_time.isoformat() if license_obj.expire_time else None,
'remaining_days': license_obj.get_remaining_days(),
'activate_time': license_obj.activate_time.isoformat() if license_obj.activate_time else None,
'last_verify_time': license_obj.last_verify_time.isoformat() if license_obj.last_verify_time else None
}
})
except Exception as e:
current_app.logger.error(f"获取授权信息失败: {str(e)}")
return jsonify({
'success': False,
'message': '服务器内部错误'
}), 500
@api_bp.route('/auth/heartbeat', methods=['POST'])
def heartbeat():
"""心跳接口(用于在线验证)"""
try:
data = request.get_json()
if not data:
return jsonify({'success': False}), 400
software_id = data.get('software_id')
machine_code = data.get('machine_code')
if not software_id or not machine_code:
return jsonify({'success': False}), 400
# 更新设备最后验证时间
device = Device.query.filter_by(
machine_code=machine_code,
status=1
).first()
if device:
device.update_verify_time()
return jsonify({'success': True})
except Exception as e:
current_app.logger.error(f"心跳处理失败: {str(e)}")
return jsonify({'success': False}), 500
@api_bp.route('/auth/unbind_device', methods=['POST'])
def unbind_device():
"""用户端解绑当前设备接口"""
try:
data = request.get_json()
if not data:
return jsonify({
'success': False,
'message': '请求数据为空'
}), 400
software_id = data.get('software_id')
machine_code = data.get('machine_code')
timestamp = data.get('timestamp')
signature = data.get('signature')
# 验证必填参数
if not all([software_id, machine_code, timestamp, signature]):
return jsonify({
'success': False,
'message': '缺少必要参数'
}), 400
# 验证签名
secret_key = current_app.config.get('AUTH_SECRET_KEY')
if not secret_key:
current_app.logger.error("AUTH_SECRET_KEY未配置")
return jsonify({
'success': False,
'message': '服务器配置错误:密钥未设置'
}), 500
signature_data = f"{software_id}{machine_code}{timestamp}"
expected_signature = generate_signature(signature_data, secret_key)
if signature != expected_signature:
return jsonify({
'success': False,
'message': '签名验证失败'
}), 401
# 查找产品
product = Product.query.filter_by(product_id=software_id).first()
if not product:
return jsonify({
'success': False,
'message': '产品不存在'
}), 404
# 查找设备
device = Device.query.filter_by(
machine_code=machine_code,
product_id=software_id
).first()
if not device:
return jsonify({
'success': False,
'message': '设备不存在'
}), 404
# 解绑设备与卡密的关联
device.unbind_license()
db.session.commit()
return jsonify({
'success': True,
'message': '设备解绑成功'
})
except Exception as e:
db.session.rollback()
current_app.logger.error(f"设备解绑失败: {str(e)}")
return jsonify({
'success': False,
'message': '服务器内部错误'
}), 500
@api_bp.route('/auth/unbind_license_user', methods=['POST'])
def unbind_license_user():
"""用户端解绑卡密接口"""
try:
data = request.get_json()
if not data:
return jsonify({
'success': False,
'message': '请求数据为空'
}), 400
software_id = data.get('software_id')
license_key = data.get('license_key')
machine_code = data.get('machine_code')
timestamp = data.get('timestamp')
signature = data.get('signature')
# 验证必填参数
if not all([software_id, license_key, machine_code, timestamp, signature]):
return jsonify({
'success': False,
'message': '缺少必要参数'
}), 400
# 验证签名
secret_key = current_app.config.get('AUTH_SECRET_KEY')
if not secret_key:
current_app.logger.error("AUTH_SECRET_KEY未配置")
return jsonify({
'success': False,
'message': '服务器配置错误:密钥未设置'
}), 500
signature_data = f"{software_id}{license_key}{machine_code}{timestamp}"
expected_signature = generate_signature(signature_data, secret_key)
if signature != expected_signature:
return jsonify({
'success': False,
'message': '签名验证失败'
}), 401
# 查找产品
product = Product.query.filter_by(product_id=software_id).first()
if not product:
return jsonify({
'success': False,
'message': '产品不存在'
}), 404
# 查找卡密
license_obj = License.query.filter_by(
license_key=license_key,
product_id=software_id
).first()
if not license_obj:
return jsonify({
'success': False,
'message': '卡密不存在'
}), 404
# 检查卡密是否绑定到当前机器码
if license_obj.bind_machine_code != machine_code:
return jsonify({
'success': False,
'message': '卡密未绑定到当前设备'
}), 400
# 执行解绑操作
success, message = license_obj.unbind()
if not success:
return jsonify({
'success': False,
'message': message
}), 400
db.session.commit()
return jsonify({
'success': True,
'message': '解绑成功'
})
except Exception as e:
db.session.rollback()
current_app.logger.error(f"卡密解绑失败: {str(e)}")
return jsonify({
'success': False,
'message': '服务器内部错误'
}), 500