from flask import request, jsonify, current_app from datetime import datetime, timedelta from app import db from app.models import Product, License, Device, Version from app.utils.crypto import generate_hash, verify_hash, generate_signature from . import api_bp @api_bp.route('/auth/verify', methods=['POST']) def verify_license(): """验证卡密接口""" try: data = request.get_json() if not data: return jsonify({ 'success': False, 'message': '请求数据为空' }), 400 # 获取请求参数 software_id = data.get('software_id') license_key = data.get('license_key') machine_code = data.get('machine_code') timestamp = data.get('timestamp') signature = data.get('signature') # 验证必填参数 if not all([software_id, license_key, machine_code, timestamp, signature]): return jsonify({ 'success': False, 'message': '缺少必要参数' }), 400 # 验证时间戳(防止重放攻击) try: request_time = datetime.fromtimestamp(int(timestamp)) # 增加时间验证的宽容度到5分钟(300秒) time_diff = abs((datetime.utcnow() - request_time).total_seconds()) if time_diff > 300: # 5分钟有效期 return jsonify({ 'success': False, 'message': '请求已过期' }), 400 except (ValueError, TypeError): return jsonify({ 'success': False, 'message': '时间戳格式错误' }), 400 # 验证签名 secret_key = current_app.config.get('AUTH_SECRET_KEY', 'default-secret-key') signature_data = f"{software_id}{license_key}{machine_code}{timestamp}" expected_signature = generate_signature(signature_data, secret_key) # 调试信息(仅在调试模式记录,且不输出密钥) if current_app.debug: current_app.logger.debug(f"签名数据: {signature_data}") current_app.logger.debug(f"客户端签名: {signature}") current_app.logger.debug(f"服务端签名匹配: {signature == expected_signature}") if signature != expected_signature: return jsonify({ 'success': False, 'message': '签名验证失败' }), 401 # 查找产品 product = Product.query.filter_by(product_id=software_id).first() if not product: return jsonify({ 'success': False, 'message': '产品不存在' }), 404 if not product.is_enabled(): return jsonify({ 'success': False, 'message': '产品已禁用' }), 403 # 查找卡密 license_obj = License.query.filter_by( license_key=license_key, product_id=software_id ).first() if not license_obj: return jsonify({ 'success': False, 'message': '卡密不存在' }), 404 # 验证卡密 software_version = request.json.get('software_version', '1.0.0') if request.json else '1.0.0' success, message = license_obj.verify(machine_code, software_version) if not success: return jsonify({ 'success': False, 'message': message }), 403 # 处理首次激活 if license_obj.status == 0: # 未激活 software_version = request.json.get('software_version', '1.0.0') if request.json else '1.0.0' success, message = license_obj.activate( machine_code, software_version ) if not success: return jsonify({ 'success': False, 'message': message }), 403 # 更新设备验证时间 device = Device.query.filter_by(machine_code=machine_code).first() if device: device.update_verify_time() # 获取最新版本信息 latest_version = Version.query.filter_by( product_id=software_id, publish_status=1 ).order_by(Version.create_time.desc()).first() response_data = { 'license_key': license_obj.license_key, 'type': license_obj.type, 'type_name': '试用' if license_obj.is_trial() else '正式', 'expire_time': license_obj.expire_time.isoformat() if license_obj.expire_time else None, 'remaining_days': license_obj.get_remaining_days(), 'product_name': product.product_name, 'activate_time': license_obj.activate_time.isoformat() if license_obj.activate_time else None } # 添加版本信息 if latest_version: current_version = request.json.get('software_version', '1.0.0') if request.json else '1.0.0' response_data.update({ 'new_version': latest_version.version_num, 'download_url': latest_version.download_url, 'force_update': latest_version.is_force_update(), 'update_log': latest_version.update_log, 'need_update': latest_version.version_num != current_version }) return jsonify({ 'success': True, 'message': '验证成功', 'data': response_data }) except Exception as e: current_app.logger.error(f"卡密验证失败: {str(e)}") return jsonify({ 'success': False, 'message': '服务器内部错误' }), 500 @api_bp.route('/auth/activate', methods=['POST']) def activate_license(): """激活卡密接口""" try: data = request.get_json() if not data: return jsonify({ 'success': False, 'message': '请求数据为空' }), 400 software_id = data.get('software_id') license_key = data.get('license_key') machine_code = data.get('machine_code') software_version = data.get('software_version', '1.0.0') # 验证必填参数 if not all([software_id, license_key, machine_code]): return jsonify({ 'success': False, 'message': '缺少必要参数' }), 400 # 查找产品 product = Product.query.filter_by(product_id=software_id).first() if not product: return jsonify({ 'success': False, 'message': '产品不存在' }), 404 # 查找卡密 license_obj = License.query.filter_by( license_key=license_key, product_id=software_id ).first() if not license_obj: return jsonify({ 'success': False, 'message': '卡密不存在' }), 404 # 激活卡密 success, message = license_obj.activate(machine_code, software_version) if not success: return jsonify({ 'success': False, 'message': message }), 400 return jsonify({ 'success': True, 'message': '激活成功', 'data': license_obj.to_dict() }) except Exception as e: current_app.logger.error(f"卡密激活失败: {str(e)}") return jsonify({ 'success': False, 'message': '服务器内部错误' }), 500 @api_bp.route('/auth/info', methods=['GET']) def get_auth_info(): """获取授权信息接口""" try: software_id = request.args.get('software_id') machine_code = request.args.get('machine_code') if not software_id or not machine_code: return jsonify({ 'success': False, 'message': '缺少必要参数' }), 400 # 查找设备 device = Device.query.filter_by( machine_code=machine_code, status=1 ).first() if not device: return jsonify({ 'success': False, 'message': '设备未激活' }), 404 # 查找关联的卡密 license_obj = device.license if not license_obj or license_obj.product_id != software_id: return jsonify({ 'success': False, 'message': '授权信息不匹配' }), 403 # 检查授权状态 if license_obj.is_expired(): return jsonify({ 'success': False, 'message': '授权已过期' }), 403 # 返回授权信息 return jsonify({ 'success': True, 'data': { 'license_key': license_obj.license_key, 'type': license_obj.type, 'type_name': '试用' if license_obj.is_trial() else '正式', 'expire_time': license_obj.expire_time.isoformat() if license_obj.expire_time else None, 'remaining_days': license_obj.get_remaining_days(), 'activate_time': license_obj.activate_time.isoformat() if license_obj.activate_time else None, 'last_verify_time': license_obj.last_verify_time.isoformat() if license_obj.last_verify_time else None } }) except Exception as e: current_app.logger.error(f"获取授权信息失败: {str(e)}") return jsonify({ 'success': False, 'message': '服务器内部错误' }), 500 @api_bp.route('/auth/heartbeat', methods=['POST']) def heartbeat(): """心跳接口(用于在线验证)""" try: data = request.get_json() if not data: return jsonify({'success': False}), 400 software_id = data.get('software_id') machine_code = data.get('machine_code') if not software_id or not machine_code: return jsonify({'success': False}), 400 # 更新设备最后验证时间 device = Device.query.filter_by( machine_code=machine_code, status=1 ).first() if device: device.update_verify_time() return jsonify({'success': True}) except Exception as e: current_app.logger.error(f"心跳处理失败: {str(e)}") return jsonify({'success': False}), 500