Kamixitong/app/api/auth.py
2025-11-13 16:51:51 +08:00

312 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import request, jsonify, current_app
from datetime import datetime, timedelta
from app import db
from app.models import Product, License, Device, Version
from app.utils.crypto import generate_hash, verify_hash, generate_signature
from . import api_bp
@api_bp.route('/auth/verify', methods=['POST'])
def verify_license():
"""验证卡密接口"""
try:
data = request.get_json()
if not data:
return jsonify({
'success': False,
'message': '请求数据为空'
}), 400
# 获取请求参数
software_id = data.get('software_id')
license_key = data.get('license_key')
machine_code = data.get('machine_code')
timestamp = data.get('timestamp')
signature = data.get('signature')
# 验证必填参数
if not all([software_id, license_key, machine_code, timestamp, signature]):
return jsonify({
'success': False,
'message': '缺少必要参数'
}), 400
# 验证时间戳(防止重放攻击)
try:
request_time = datetime.fromtimestamp(int(timestamp))
# 增加时间验证的宽容度到5分钟300秒
time_diff = abs((datetime.utcnow() - request_time).total_seconds())
if time_diff > 300: # 5分钟有效期
return jsonify({
'success': False,
'message': '请求已过期'
}), 400
except (ValueError, TypeError):
return jsonify({
'success': False,
'message': '时间戳格式错误'
}), 400
# 验证签名
secret_key = current_app.config.get('AUTH_SECRET_KEY', 'default-secret-key')
signature_data = f"{software_id}{license_key}{machine_code}{timestamp}"
expected_signature = generate_signature(signature_data, secret_key)
# 调试信息(仅在调试模式记录,且不输出密钥)
if current_app.debug:
current_app.logger.debug(f"签名数据: {signature_data}")
current_app.logger.debug(f"客户端签名: {signature}")
current_app.logger.debug(f"服务端签名匹配: {signature == expected_signature}")
if signature != expected_signature:
return jsonify({
'success': False,
'message': '签名验证失败'
}), 401
# 查找产品
product = Product.query.filter_by(product_id=software_id).first()
if not product:
return jsonify({
'success': False,
'message': '产品不存在'
}), 404
if not product.is_enabled():
return jsonify({
'success': False,
'message': '产品已禁用'
}), 403
# 查找卡密
license_obj = License.query.filter_by(
license_key=license_key,
product_id=software_id
).first()
if not license_obj:
return jsonify({
'success': False,
'message': '卡密不存在'
}), 404
# 验证卡密
software_version = request.json.get('software_version', '1.0.0') if request.json else '1.0.0'
success, message = license_obj.verify(machine_code, software_version)
if not success:
return jsonify({
'success': False,
'message': message
}), 403
# 处理首次激活
if license_obj.status == 0: # 未激活
software_version = request.json.get('software_version', '1.0.0') if request.json else '1.0.0'
success, message = license_obj.activate(
machine_code,
software_version
)
if not success:
return jsonify({
'success': False,
'message': message
}), 403
# 更新设备验证时间
device = Device.query.filter_by(machine_code=machine_code).first()
if device:
device.update_verify_time()
# 获取最新版本信息
latest_version = Version.query.filter_by(
product_id=software_id,
publish_status=1
).order_by(Version.create_time.desc()).first()
response_data = {
'license_key': license_obj.license_key,
'type': license_obj.type,
'type_name': '试用' if license_obj.is_trial() else '正式',
'expire_time': license_obj.expire_time.isoformat() if license_obj.expire_time else None,
'remaining_days': license_obj.get_remaining_days(),
'product_name': product.product_name,
'activate_time': license_obj.activate_time.isoformat() if license_obj.activate_time else None
}
# 添加版本信息
if latest_version:
current_version = request.json.get('software_version', '1.0.0') if request.json else '1.0.0'
response_data.update({
'new_version': latest_version.version_num,
'download_url': latest_version.download_url,
'force_update': latest_version.is_force_update(),
'update_log': latest_version.update_log,
'need_update': latest_version.version_num != current_version
})
return jsonify({
'success': True,
'message': '验证成功',
'data': response_data
})
except Exception as e:
current_app.logger.error(f"卡密验证失败: {str(e)}")
return jsonify({
'success': False,
'message': '服务器内部错误'
}), 500
@api_bp.route('/auth/activate', methods=['POST'])
def activate_license():
"""激活卡密接口"""
try:
data = request.get_json()
if not data:
return jsonify({
'success': False,
'message': '请求数据为空'
}), 400
software_id = data.get('software_id')
license_key = data.get('license_key')
machine_code = data.get('machine_code')
software_version = data.get('software_version', '1.0.0')
# 验证必填参数
if not all([software_id, license_key, machine_code]):
return jsonify({
'success': False,
'message': '缺少必要参数'
}), 400
# 查找产品
product = Product.query.filter_by(product_id=software_id).first()
if not product:
return jsonify({
'success': False,
'message': '产品不存在'
}), 404
# 查找卡密
license_obj = License.query.filter_by(
license_key=license_key,
product_id=software_id
).first()
if not license_obj:
return jsonify({
'success': False,
'message': '卡密不存在'
}), 404
# 激活卡密
success, message = license_obj.activate(machine_code, software_version)
if not success:
return jsonify({
'success': False,
'message': message
}), 400
return jsonify({
'success': True,
'message': '激活成功',
'data': license_obj.to_dict()
})
except Exception as e:
current_app.logger.error(f"卡密激活失败: {str(e)}")
return jsonify({
'success': False,
'message': '服务器内部错误'
}), 500
@api_bp.route('/auth/info', methods=['GET'])
def get_auth_info():
"""获取授权信息接口"""
try:
software_id = request.args.get('software_id')
machine_code = request.args.get('machine_code')
if not software_id or not machine_code:
return jsonify({
'success': False,
'message': '缺少必要参数'
}), 400
# 查找设备
device = Device.query.filter_by(
machine_code=machine_code,
status=1
).first()
if not device:
return jsonify({
'success': False,
'message': '设备未激活'
}), 404
# 查找关联的卡密
license_obj = device.license
if not license_obj or license_obj.product_id != software_id:
return jsonify({
'success': False,
'message': '授权信息不匹配'
}), 403
# 检查授权状态
if license_obj.is_expired():
return jsonify({
'success': False,
'message': '授权已过期'
}), 403
# 返回授权信息
return jsonify({
'success': True,
'data': {
'license_key': license_obj.license_key,
'type': license_obj.type,
'type_name': '试用' if license_obj.is_trial() else '正式',
'expire_time': license_obj.expire_time.isoformat() if license_obj.expire_time else None,
'remaining_days': license_obj.get_remaining_days(),
'activate_time': license_obj.activate_time.isoformat() if license_obj.activate_time else None,
'last_verify_time': license_obj.last_verify_time.isoformat() if license_obj.last_verify_time else None
}
})
except Exception as e:
current_app.logger.error(f"获取授权信息失败: {str(e)}")
return jsonify({
'success': False,
'message': '服务器内部错误'
}), 500
@api_bp.route('/auth/heartbeat', methods=['POST'])
def heartbeat():
"""心跳接口(用于在线验证)"""
try:
data = request.get_json()
if not data:
return jsonify({'success': False}), 400
software_id = data.get('software_id')
machine_code = data.get('machine_code')
if not software_id or not machine_code:
return jsonify({'success': False}), 400
# 更新设备最后验证时间
device = Device.query.filter_by(
machine_code=machine_code,
status=1
).first()
if device:
device.update_verify_time()
return jsonify({'success': True})
except Exception as e:
current_app.logger.error(f"心跳处理失败: {str(e)}")
return jsonify({'success': False}), 500