""" 轻量级API服务器 - 个人管理版 只负责验证卡密,不含管理功能 适合部署在云服务器上,占用资源极少 作者:太一 微信:taiyi1224 """ from flask import Flask, request, jsonify import mysql.connector from datetime import datetime import os import hmac import hashlib import time import secrets from functools import wraps app = Flask(__name__) # ========== 配置(从环境变量读取,更安全) ========== DB_CONFIG = { 'host': os.environ.get('DB_HOST', 'localhost'), 'user': os.environ.get('DB_USER', 'root'), 'password': os.environ.get('DB_PASSWORD', 'taiyi1224'), 'database': os.environ.get('DB_NAME', 'exeprotector'), 'connect_timeout': 10, 'autocommit': False # 修复:改为False以支持手动事务 } # API密钥(用于验证客户端身份) API_KEY = os.environ.get('API_KEY', 'taiyi1224taiyi1224taiyi1224taiyi1224taiyi1224taiyi1224taiyi1224') # 简单的内存缓存(减少数据库查询,提高性能) cache = {} CACHE_TTL = 300 # 缓存有效期:5分钟 CACHE_MAX_SIZE = 1000 # 最大缓存条目数 # 速率限制配置 RATE_LIMIT = 10 # 每分钟最多请求次数 rate_limit_data = {} # {ip: [timestamp1, timestamp2, ...]} # ========== 辅助函数 ========== def check_rate_limit(): """检查速率限制""" ip = request.remote_addr current_time = time.time() # 清理过期的时间戳 if ip in rate_limit_data: rate_limit_data[ip] = [t for t in rate_limit_data[ip] if current_time - t < 60] else: rate_limit_data[ip] = [] # 检查是否超过限制 if len(rate_limit_data[ip]) >= RATE_LIMIT: return False # 记录本次请求 rate_limit_data[ip].append(current_time) return True def require_api_key(f): """验证API密钥的装饰器""" @wraps(f) def decorated(*args, **kwargs): # 速率限制检查 if not check_rate_limit(): log_request('rate_limited', 'Too many requests') return jsonify({'success': False, 'message': '请求过于频繁,请稍后再试'}), 429 # API密钥验证 key = request.headers.get('X-API-Key') if not key or key != API_KEY: log_request('unauthorized', 'Invalid API key') return jsonify({'success': False, 'message': '未授权访问'}), 401 return f(*args, **kwargs) return decorated def clean_cache(): """清理过期的缓存和超大缓存""" global cache current_time = time.time() # 删除过期项 expired_keys = [k for k, (t, _) in cache.items() if current_time - t >= CACHE_TTL] for k in expired_keys: del cache[k] # 如果缓存仍然太大,删除最旧的项 if len(cache) > CACHE_MAX_SIZE: sorted_items = sorted(cache.items(), key=lambda x: x[1][0]) for k, _ in sorted_items[:len(cache) - CACHE_MAX_SIZE]: del cache[k] def get_db(): """获取数据库连接""" try: return mysql.connector.connect(**DB_CONFIG) except mysql.connector.Error as e: print(f"数据库连接失败: {e}") raise def verify_signature(license_key, machine_code, software_identifier, signature): """验证请求签名(防止伪造请求) software_identifier 可以是 software_name 或 software_id """ expected = hmac.new( API_KEY.encode(), f"{license_key}{machine_code}{software_identifier}".encode(), hashlib.sha256 ).hexdigest() return hmac.compare_digest(expected, signature) def log_request(action, message, details=None): """记录请求日志(可选)""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') ip = request.remote_addr print(f"[{timestamp}] {ip} - {action}: {message}") if details: print(f" 详情: {details}") # ========== API路由 ========== @app.route('/api/health', methods=['GET']) def health(): """ 健康检查接口 用于检测API服务器是否正常运行 """ return jsonify({ 'status': 'ok', 'timestamp': time.time(), 'version': '1.0' }) @app.route('/api/validate', methods=['POST']) @require_api_key def validate(): """ 验证卡密接口 请求格式: { "license_key": "XXXXX-XXXXX-XXXXX-XXXXX", "machine_code": "1234567890ABCDEF", "software_name": "MyApp", "software_id": "uuid-string", # 可选,V2验证器使用 "signature": "hmac_sha256_hash" } 返回格式: { "success": true/false, "message": "提示信息" } """ try: # 解析请求数据 data = request.get_json() if not data: return jsonify({'success': False, 'message': '无效的请求数据'}), 400 license_key = data.get('license_key', '').strip() machine_code = data.get('machine_code', '').strip() software_name = data.get('software_name', '').strip() software_id = data.get('software_id', '').strip() # V2验证器支持 signature = data.get('signature', '') # 参数验证(software_name 或 software_id 至少有一个) if not all([license_key, machine_code, signature]) or not (software_name or software_id): log_request('validate_error', '缺少必要参数') return jsonify({'success': False, 'message': '缺少必要参数'}), 400 # 验证签名(始终使用software_name,保持与客户端一致) if not verify_signature(license_key, machine_code, software_name, signature): log_request('validate_error', '签名验证失败', { 'license_key': license_key[:10] + '...', 'machine_code': machine_code, 'software_name': software_name, 'software_id': software_id if software_id else 'N/A' }) return jsonify({'success': False, 'message': '请求签名无效'}), 400 # 检查缓存(使用software_name作为键) cache_key = f"{license_key}:{machine_code}:{software_name}" if cache_key in cache: cached_time, cached_result = cache[cache_key] if time.time() - cached_time < CACHE_TTL: log_request('validate_cache', '使用缓存结果', {'license_key': license_key[:10] + '...'}) return jsonify(cached_result) else: # 缓存过期,删除 del cache[cache_key] # 定期清理缓存 if len(cache) > CACHE_MAX_SIZE * 0.9: # 达到90%时清理 clean_cache() # 数据库验证 conn = get_db() cursor = conn.cursor(dictionary=True) try: # 1. 获取软件记录(使用software_name),如果不存在则自动创建 cursor.execute( 'SELECT id, name FROM software_products WHERE name=%s', (software_name,) ) software = cursor.fetchone() if not software: # 自动创建软件产品记录 log_request('validate_info', f'软件未注册,自动创建: {software_name}') try: cursor.execute( 'INSERT INTO software_products (name, description, created_at) VALUES (%s, %s, NOW())', (software_name, f'自动创建于激活时 - {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}') ) conn.commit() software_db_id = cursor.lastrowid log_request('validate_info', f'软件自动创建成功: {software_name}, ID: {software_db_id}') except Exception as e: log_request('validate_error', f'自动创建软件失败: {software_name}, 错误: {e}') return jsonify({ 'success': False, 'message': f'软件未注册且无法自动创建: {software_name}。请联系管理员在服务器数据库中添加此软件。' }) else: software_db_id = software['id'] # 2. 查询卡密信息 cursor.execute( 'SELECT * FROM license_keys WHERE key_code=%s AND software_id=%s', (license_key, software_db_id) ) key_info = cursor.fetchone() if not key_info: log_request('validate_error', '激活码不存在', { 'license_key': license_key, 'software_name': software_name }) return jsonify({'success': False, 'message': '激活码不存在'}) # 3. 检查封禁状态 if key_info['status'] == 'banned': log_request('validate_error', '激活码已被封禁', { 'license_key': license_key }) return jsonify({'success': False, 'message': '激活码已被封禁'}) # 4. 检查过期时间 if key_info['end_time'] < datetime.now(): # 更新状态为已过期 cursor.execute( "UPDATE license_keys SET status='expired' WHERE key_code=%s", (license_key,) ) log_request('validate_error', '激活码已过期', { 'license_key': license_key, 'end_time': key_info['end_time'] }) return jsonify({'success': False, 'message': '激活码已过期'}) # 5. 处理已激活的卡密 if key_info['status'] == 'active': if key_info['machine_code'] != machine_code: log_request('validate_error', '设备不匹配', { 'license_key': license_key, 'stored': key_info['machine_code'][:8] + '...', 'provided': machine_code[:8] + '...' }) return jsonify({ 'success': False, 'message': '此激活码已在其他设备上使用' }) else: # 验证成功,缓存结果,返回到期时间 remaining_days = (key_info['end_time'] - datetime.now()).days # 生成验证token(用于离线验证) token = secrets.token_urlsafe(32) result = { 'success': True, 'message': '验证成功', 'expires_at': key_info['end_time'].isoformat(), 'start_time': key_info['start_time'].isoformat() if key_info['start_time'] else datetime.now().isoformat(), 'remaining_days': max(0, remaining_days), 'token': token } cache[cache_key] = (time.time(), result) log_request('validate_success', '验证成功', { 'license_key': license_key[:10] + '...', 'machine_code': machine_code, 'remaining_days': remaining_days, 'expires_at': result['expires_at'] }) return jsonify(result) # 6. 处理首次激活(使用事务确保并发安全) if key_info['status'] == 'unused': try: # 确保之前的任何隐式事务已提交,然后开始新事务 if conn.in_transaction: conn.commit() conn.start_transaction() # 再次检查状态(防止并发问题) cursor.execute( 'SELECT status FROM license_keys WHERE key_code=%s FOR UPDATE', (license_key,) ) current_status = cursor.fetchone() if not current_status or current_status['status'] != 'unused': conn.rollback() log_request('validate_error', '激活码状态已变更(并发)', { 'license_key': license_key }) return jsonify({ 'success': False, 'message': '激活码已被使用' }) # 激活 cursor.execute(""" UPDATE license_keys SET status='active', machine_code=%s, start_time=NOW() WHERE key_code=%s """, (machine_code, license_key)) # 提交事务 conn.commit() except Exception as e: conn.rollback() log_request('database_error', f'激活事务失败: {e}') return jsonify({ 'success': False, 'message': '激活失败,请重试' }), 500 # 激活成功,缓存结果,返回到期时间 # 重新查询获取完整信息 cursor.execute( 'SELECT * FROM license_keys WHERE key_code=%s', (license_key,) ) activated_key = cursor.fetchone() remaining_days = (activated_key['end_time'] - datetime.now()).days # 生成验证token(用于离线验证) token = secrets.token_urlsafe(32) result = { 'success': True, 'message': '激活成功', 'expires_at': activated_key['end_time'].isoformat(), 'start_time': activated_key['start_time'].isoformat() if activated_key['start_time'] else datetime.now().isoformat(), 'remaining_days': max(0, remaining_days), 'token': token } cache[cache_key] = (time.time(), result) log_request('activate_success', '首次激活成功', { 'license_key': license_key[:10] + '...', 'machine_code': machine_code, 'software_name': software_name, 'remaining_days': remaining_days, 'expires_at': result['expires_at'], 'start_time': result['start_time'] }) return jsonify(result) # 7. 其他状态(不应该到达这里) log_request('validate_error', '激活码状态异常', { 'license_key': license_key, 'status': key_info['status'] }) return jsonify({ 'success': False, 'message': f'激活码状态异常: {key_info["status"]}' }) finally: cursor.close() conn.close() except mysql.connector.Error as e: log_request('database_error', str(e)) return jsonify({ 'success': False, 'message': '数据库错误,请稍后重试' }), 500 except Exception as e: log_request('server_error', str(e)) print(f"验证过程出错: {e}") import traceback traceback.print_exc() return jsonify({ 'success': False, 'message': '服务器内部错误' }), 500 @app.route('/api/license/status', methods=['POST']) @require_api_key def license_status(): """ 查询许可证状态接口 用于客户端定期刷新状态 请求格式: { "license_key": "XXXXX-XXXXX-XXXXX-XXXXX", "machine_code": "1234567890ABCDEF", "software_name": "MyApp" } 返回格式: { "success": true/false, "status": "active/expired/banned", "end_time": "2025-12-31T23:59:59", "remaining_days": 30, "is_expired": false, "needs_renewal": false } """ try: data = request.get_json() license_key = data.get('license_key', '').strip() machine_code = data.get('machine_code', '').strip() software_name = data.get('software_name', '').strip() if not all([license_key, machine_code, software_name]): return jsonify({ 'success': False, 'message': '缺少必要参数' }), 400 conn = get_db() cursor = conn.cursor(dictionary=True) try: # 获取软件ID cursor.execute( 'SELECT id FROM software_products WHERE name=%s', (software_name,) ) software = cursor.fetchone() if not software: return jsonify({ 'success': False, 'message': '软件未注册' }) # 查询许可证信息 cursor.execute(''' SELECT * FROM license_keys WHERE key_code=%s AND software_id=%s ''', (license_key, software['id'])) key_info = cursor.fetchone() if not key_info: return jsonify({ 'success': False, 'message': '许可证不存在' }) # 验证机器码 if key_info['machine_code'] and key_info['machine_code'] != machine_code: return jsonify({ 'success': False, 'message': '设备不匹配' }) # 计算剩余天数 end_time = key_info['end_time'] remaining_days = (end_time - datetime.now()).days is_expired = remaining_days < 0 needs_renewal = remaining_days <= 7 # 返回状态信息 return jsonify({ 'success': True, 'status': key_info['status'], 'end_time': end_time.isoformat(), 'remaining_days': max(0, remaining_days), 'is_expired': is_expired, 'needs_renewal': needs_renewal }) finally: cursor.close() conn.close() except Exception as e: log_request('status_error', str(e)) return jsonify({ 'success': False, 'message': '查询失败' }), 500 @app.route('/api/stats', methods=['GET']) @require_api_key def stats(): """ 获取统计信息(可选) 需要API密钥验证 """ try: conn = get_db() cursor = conn.cursor(dictionary=True) try: # 统计各状态的卡密数量 cursor.execute(""" SELECT status, COUNT(*) as count FROM license_keys GROUP BY status """) status_counts = {row['status']: row['count'] for row in cursor.fetchall()} # 统计今日激活数 cursor.execute(""" SELECT COUNT(*) as count FROM license_keys WHERE DATE(start_time) = CURDATE() """) today_activations = cursor.fetchone()['count'] return jsonify({ 'success': True, 'stats': { 'status_counts': status_counts, 'today_activations': today_activations, 'cache_size': len(cache) } }) finally: cursor.close() conn.close() except Exception as e: print(f"统计错误: {e}") return jsonify({'success': False, 'message': '统计失败'}), 500 # ========== 错误处理 ========== @app.errorhandler(404) def not_found(error): return jsonify({'success': False, 'message': '接口不存在'}), 404 @app.errorhandler(500) def internal_error(error): return jsonify({'success': False, 'message': '服务器内部错误'}), 500 # ========== 主函数 ========== if __name__ == '__main__': print("=" * 60) print("轻量级API服务器 - 个人管理版") print("=" * 60) print(f"数据库: {DB_CONFIG['host']}/{DB_CONFIG['database']}") print(f"端口: 5100") print(f"API密钥: {'已设置' if API_KEY != 'change-this-to-random-string-32-chars-min' else '⚠️ 请修改默认密钥!'}") print("=" * 60) print() print("生产环境请使用 gunicorn:") print(" gunicorn -w 2 -b 0.0.0.0:5100 api_server_lite:app") print() print("测试接口:") print(" curl http://localhost:5100/api/health") print() print("按 Ctrl+C 停止服务器") print("=" * 60) # 开发环境运行 app.run(host='0.0.0.0', port=5100, debug=False)