Exeprotector/api_server_lite.py
2025-10-28 13:18:45 +08:00

593 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
轻量级API服务器 - 个人管理版
只负责验证卡密,不含管理功能
适合部署在云服务器上,占用资源极少
作者:太一
微信taiyi1224
"""
from flask import Flask, request, jsonify
import mysql.connector
from datetime import datetime
import os
import hmac
import hashlib
import time
import secrets
from functools import wraps
app = Flask(__name__)
# ========== 配置(从环境变量读取,更安全) ==========
DB_CONFIG = {
'host': os.environ.get('DB_HOST', 'localhost'),
'user': os.environ.get('DB_USER', 'root'),
'password': os.environ.get('DB_PASSWORD', 'taiyi1224'),
'database': os.environ.get('DB_NAME', 'exeprotector'),
'connect_timeout': 10,
'autocommit': False # 修复改为False以支持手动事务
}
# API密钥用于验证客户端身份
API_KEY = os.environ.get('API_KEY', 'taiyi1224taiyi1224taiyi1224taiyi1224taiyi1224taiyi1224taiyi1224')
# 简单的内存缓存(减少数据库查询,提高性能)
cache = {}
CACHE_TTL = 300 # 缓存有效期5分钟
CACHE_MAX_SIZE = 1000 # 最大缓存条目数
# 速率限制配置
RATE_LIMIT = 10 # 每分钟最多请求次数
rate_limit_data = {} # {ip: [timestamp1, timestamp2, ...]}
# ========== 辅助函数 ==========
def check_rate_limit():
"""检查速率限制"""
ip = request.remote_addr
current_time = time.time()
# 清理过期的时间戳
if ip in rate_limit_data:
rate_limit_data[ip] = [t for t in rate_limit_data[ip] if current_time - t < 60]
else:
rate_limit_data[ip] = []
# 检查是否超过限制
if len(rate_limit_data[ip]) >= RATE_LIMIT:
return False
# 记录本次请求
rate_limit_data[ip].append(current_time)
return True
def require_api_key(f):
"""验证API密钥的装饰器"""
@wraps(f)
def decorated(*args, **kwargs):
# 速率限制检查
if not check_rate_limit():
log_request('rate_limited', 'Too many requests')
return jsonify({'success': False, 'message': '请求过于频繁,请稍后再试'}), 429
# API密钥验证
key = request.headers.get('X-API-Key')
if not key or key != API_KEY:
log_request('unauthorized', 'Invalid API key')
return jsonify({'success': False, 'message': '未授权访问'}), 401
return f(*args, **kwargs)
return decorated
def clean_cache():
"""清理过期的缓存和超大缓存"""
global cache
current_time = time.time()
# 删除过期项
expired_keys = [k for k, (t, _) in cache.items() if current_time - t >= CACHE_TTL]
for k in expired_keys:
del cache[k]
# 如果缓存仍然太大,删除最旧的项
if len(cache) > CACHE_MAX_SIZE:
sorted_items = sorted(cache.items(), key=lambda x: x[1][0])
for k, _ in sorted_items[:len(cache) - CACHE_MAX_SIZE]:
del cache[k]
def get_db():
"""获取数据库连接"""
try:
return mysql.connector.connect(**DB_CONFIG)
except mysql.connector.Error as e:
print(f"数据库连接失败: {e}")
raise
def verify_signature(license_key, machine_code, software_identifier, signature):
"""验证请求签名(防止伪造请求)
software_identifier 可以是 software_name 或 software_id
"""
expected = hmac.new(
API_KEY.encode(),
f"{license_key}{machine_code}{software_identifier}".encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
def log_request(action, message, details=None):
"""记录请求日志(可选)"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
ip = request.remote_addr
print(f"[{timestamp}] {ip} - {action}: {message}")
if details:
print(f" 详情: {details}")
# ========== API路由 ==========
@app.route('/api/health', methods=['GET'])
def health():
"""
健康检查接口
用于检测API服务器是否正常运行
"""
return jsonify({
'status': 'ok',
'timestamp': time.time(),
'version': '1.0'
})
@app.route('/api/validate', methods=['POST'])
@require_api_key
def validate():
"""
验证卡密接口
请求格式:
{
"license_key": "XXXXX-XXXXX-XXXXX-XXXXX",
"machine_code": "1234567890ABCDEF",
"software_name": "MyApp",
"software_id": "uuid-string", # 可选V2验证器使用
"signature": "hmac_sha256_hash"
}
返回格式:
{
"success": true/false,
"message": "提示信息"
}
"""
try:
# 解析请求数据
data = request.get_json()
if not data:
return jsonify({'success': False, 'message': '无效的请求数据'}), 400
license_key = data.get('license_key', '').strip()
machine_code = data.get('machine_code', '').strip()
software_name = data.get('software_name', '').strip()
software_id = data.get('software_id', '').strip() # V2验证器支持
signature = data.get('signature', '')
# 参数验证software_name 或 software_id 至少有一个)
if not all([license_key, machine_code, signature]) or not (software_name or software_id):
log_request('validate_error', '缺少必要参数')
return jsonify({'success': False, 'message': '缺少必要参数'}), 400
# 验证签名始终使用software_name保持与客户端一致
if not verify_signature(license_key, machine_code, software_name, signature):
log_request('validate_error', '签名验证失败', {
'license_key': license_key[:10] + '...',
'machine_code': machine_code,
'software_name': software_name,
'software_id': software_id if software_id else 'N/A'
})
return jsonify({'success': False, 'message': '请求签名无效'}), 400
# 检查缓存使用software_name作为键
cache_key = f"{license_key}:{machine_code}:{software_name}"
if cache_key in cache:
cached_time, cached_result = cache[cache_key]
if time.time() - cached_time < CACHE_TTL:
log_request('validate_cache', '使用缓存结果', {'license_key': license_key[:10] + '...'})
return jsonify(cached_result)
else:
# 缓存过期,删除
del cache[cache_key]
# 定期清理缓存
if len(cache) > CACHE_MAX_SIZE * 0.9: # 达到90%时清理
clean_cache()
# 数据库验证
conn = get_db()
cursor = conn.cursor(dictionary=True)
try:
# 1. 获取软件记录使用software_name如果不存在则自动创建
cursor.execute(
'SELECT id, name FROM software_products WHERE name=%s',
(software_name,)
)
software = cursor.fetchone()
if not software:
# 自动创建软件产品记录
log_request('validate_info', f'软件未注册,自动创建: {software_name}')
try:
cursor.execute(
'INSERT INTO software_products (name, description, created_at) VALUES (%s, %s, NOW())',
(software_name, f'自动创建于激活时 - {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}')
)
conn.commit()
software_db_id = cursor.lastrowid
log_request('validate_info', f'软件自动创建成功: {software_name}, ID: {software_db_id}')
except Exception as e:
log_request('validate_error', f'自动创建软件失败: {software_name}, 错误: {e}')
return jsonify({
'success': False,
'message': f'软件未注册且无法自动创建: {software_name}。请联系管理员在服务器数据库中添加此软件。'
})
else:
software_db_id = software['id']
# 2. 查询卡密信息
cursor.execute(
'SELECT * FROM license_keys WHERE key_code=%s AND software_id=%s',
(license_key, software_db_id)
)
key_info = cursor.fetchone()
if not key_info:
log_request('validate_error', '激活码不存在', {
'license_key': license_key,
'software_name': software_name
})
return jsonify({'success': False, 'message': '激活码不存在'})
# 3. 检查封禁状态
if key_info['status'] == 'banned':
log_request('validate_error', '激活码已被封禁', {
'license_key': license_key
})
return jsonify({'success': False, 'message': '激活码已被封禁'})
# 4. 检查过期时间
if key_info['end_time'] < datetime.now():
# 更新状态为已过期
cursor.execute(
"UPDATE license_keys SET status='expired' WHERE key_code=%s",
(license_key,)
)
log_request('validate_error', '激活码已过期', {
'license_key': license_key,
'end_time': key_info['end_time']
})
return jsonify({'success': False, 'message': '激活码已过期'})
# 5. 处理已激活的卡密
if key_info['status'] == 'active':
if key_info['machine_code'] != machine_code:
log_request('validate_error', '设备不匹配', {
'license_key': license_key,
'stored': key_info['machine_code'][:8] + '...',
'provided': machine_code[:8] + '...'
})
return jsonify({
'success': False,
'message': '此激活码已在其他设备上使用'
})
else:
# 验证成功,缓存结果,返回到期时间
remaining_days = (key_info['end_time'] - datetime.now()).days
# 生成验证token用于离线验证
token = secrets.token_urlsafe(32)
result = {
'success': True,
'message': '验证成功',
'expires_at': key_info['end_time'].isoformat(),
'start_time': key_info['start_time'].isoformat() if key_info['start_time'] else datetime.now().isoformat(),
'remaining_days': max(0, remaining_days),
'token': token
}
cache[cache_key] = (time.time(), result)
log_request('validate_success', '验证成功', {
'license_key': license_key[:10] + '...',
'machine_code': machine_code,
'remaining_days': remaining_days,
'expires_at': result['expires_at']
})
return jsonify(result)
# 6. 处理首次激活(使用事务确保并发安全)
if key_info['status'] == 'unused':
try:
# 确保之前的任何隐式事务已提交,然后开始新事务
if conn.in_transaction:
conn.commit()
conn.start_transaction()
# 再次检查状态(防止并发问题)
cursor.execute(
'SELECT status FROM license_keys WHERE key_code=%s FOR UPDATE',
(license_key,)
)
current_status = cursor.fetchone()
if not current_status or current_status['status'] != 'unused':
conn.rollback()
log_request('validate_error', '激活码状态已变更(并发)', {
'license_key': license_key
})
return jsonify({
'success': False,
'message': '激活码已被使用'
})
# 激活
cursor.execute("""
UPDATE license_keys
SET status='active', machine_code=%s, start_time=NOW()
WHERE key_code=%s
""", (machine_code, license_key))
# 提交事务
conn.commit()
except Exception as e:
conn.rollback()
log_request('database_error', f'激活事务失败: {e}')
return jsonify({
'success': False,
'message': '激活失败,请重试'
}), 500
# 激活成功,缓存结果,返回到期时间
# 重新查询获取完整信息
cursor.execute(
'SELECT * FROM license_keys WHERE key_code=%s',
(license_key,)
)
activated_key = cursor.fetchone()
remaining_days = (activated_key['end_time'] - datetime.now()).days
# 生成验证token用于离线验证
token = secrets.token_urlsafe(32)
result = {
'success': True,
'message': '激活成功',
'expires_at': activated_key['end_time'].isoformat(),
'start_time': activated_key['start_time'].isoformat() if activated_key['start_time'] else datetime.now().isoformat(),
'remaining_days': max(0, remaining_days),
'token': token
}
cache[cache_key] = (time.time(), result)
log_request('activate_success', '首次激活成功', {
'license_key': license_key[:10] + '...',
'machine_code': machine_code,
'software_name': software_name,
'remaining_days': remaining_days,
'expires_at': result['expires_at'],
'start_time': result['start_time']
})
return jsonify(result)
# 7. 其他状态(不应该到达这里)
log_request('validate_error', '激活码状态异常', {
'license_key': license_key,
'status': key_info['status']
})
return jsonify({
'success': False,
'message': f'激活码状态异常: {key_info["status"]}'
})
finally:
cursor.close()
conn.close()
except mysql.connector.Error as e:
log_request('database_error', str(e))
return jsonify({
'success': False,
'message': '数据库错误,请稍后重试'
}), 500
except Exception as e:
log_request('server_error', str(e))
print(f"验证过程出错: {e}")
import traceback
traceback.print_exc()
return jsonify({
'success': False,
'message': '服务器内部错误'
}), 500
@app.route('/api/license/status', methods=['POST'])
@require_api_key
def license_status():
"""
查询许可证状态接口
用于客户端定期刷新状态
请求格式:
{
"license_key": "XXXXX-XXXXX-XXXXX-XXXXX",
"machine_code": "1234567890ABCDEF",
"software_name": "MyApp"
}
返回格式:
{
"success": true/false,
"status": "active/expired/banned",
"end_time": "2025-12-31T23:59:59",
"remaining_days": 30,
"is_expired": false,
"needs_renewal": false
}
"""
try:
data = request.get_json()
license_key = data.get('license_key', '').strip()
machine_code = data.get('machine_code', '').strip()
software_name = data.get('software_name', '').strip()
if not all([license_key, machine_code, software_name]):
return jsonify({
'success': False,
'message': '缺少必要参数'
}), 400
conn = get_db()
cursor = conn.cursor(dictionary=True)
try:
# 获取软件ID
cursor.execute(
'SELECT id FROM software_products WHERE name=%s',
(software_name,)
)
software = cursor.fetchone()
if not software:
return jsonify({
'success': False,
'message': '软件未注册'
})
# 查询许可证信息
cursor.execute('''
SELECT * FROM license_keys
WHERE key_code=%s AND software_id=%s
''', (license_key, software['id']))
key_info = cursor.fetchone()
if not key_info:
return jsonify({
'success': False,
'message': '许可证不存在'
})
# 验证机器码
if key_info['machine_code'] and key_info['machine_code'] != machine_code:
return jsonify({
'success': False,
'message': '设备不匹配'
})
# 计算剩余天数
end_time = key_info['end_time']
remaining_days = (end_time - datetime.now()).days
is_expired = remaining_days < 0
needs_renewal = remaining_days <= 7
# 返回状态信息
return jsonify({
'success': True,
'status': key_info['status'],
'end_time': end_time.isoformat(),
'remaining_days': max(0, remaining_days),
'is_expired': is_expired,
'needs_renewal': needs_renewal
})
finally:
cursor.close()
conn.close()
except Exception as e:
log_request('status_error', str(e))
return jsonify({
'success': False,
'message': '查询失败'
}), 500
@app.route('/api/stats', methods=['GET'])
@require_api_key
def stats():
"""
获取统计信息(可选)
需要API密钥验证
"""
try:
conn = get_db()
cursor = conn.cursor(dictionary=True)
try:
# 统计各状态的卡密数量
cursor.execute("""
SELECT
status,
COUNT(*) as count
FROM license_keys
GROUP BY status
""")
status_counts = {row['status']: row['count'] for row in cursor.fetchall()}
# 统计今日激活数
cursor.execute("""
SELECT COUNT(*) as count
FROM license_keys
WHERE DATE(start_time) = CURDATE()
""")
today_activations = cursor.fetchone()['count']
return jsonify({
'success': True,
'stats': {
'status_counts': status_counts,
'today_activations': today_activations,
'cache_size': len(cache)
}
})
finally:
cursor.close()
conn.close()
except Exception as e:
print(f"统计错误: {e}")
return jsonify({'success': False, 'message': '统计失败'}), 500
# ========== 错误处理 ==========
@app.errorhandler(404)
def not_found(error):
return jsonify({'success': False, 'message': '接口不存在'}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({'success': False, 'message': '服务器内部错误'}), 500
# ========== 主函数 ==========
if __name__ == '__main__':
print("=" * 60)
print("轻量级API服务器 - 个人管理版")
print("=" * 60)
print(f"数据库: {DB_CONFIG['host']}/{DB_CONFIG['database']}")
print(f"端口: 5100")
print(f"API密钥: {'已设置' if API_KEY != 'change-this-to-random-string-32-chars-min' else '⚠️ 请修改默认密钥!'}")
print("=" * 60)
print()
print("生产环境请使用 gunicorn:")
print(" gunicorn -w 2 -b 0.0.0.0:5100 api_server_lite:app")
print()
print("测试接口:")
print(" curl http://localhost:5100/api/health")
print()
print("按 Ctrl+C 停止服务器")
print("=" * 60)
# 开发环境运行
app.run(host='0.0.0.0', port=5100, debug=False)