Kamixitong/test_license_expiration_fix.py

240 lines
7.8 KiB
Python
Raw Permalink Normal View History

2025-12-12 11:35:14 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
卡密过期状态更新功能测试脚本
此脚本用于验证过期卡密自动更新功能是否正常工作
"""
import sys
import os
from datetime import datetime, timedelta
# 添加项目路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app import create_app, db
from app.models import License, Product
def create_test_license(app):
"""创建测试卡密"""
with app.app_context():
# 查找或创建测试产品
product = Product.query.filter_by(product_id='test-product').first()
if not product:
product = Product(
product_id='test-product',
product_name='测试产品',
status=1
)
db.session.add(product)
db.session.flush()
# 创建测试卡密1天有效期
license_obj = License(
product_id='test-product',
type=1, # 正式卡密
valid_days=1,
status=0 # 未激活
)
db.session.add(license_obj)
db.session.commit()
print(f"✅ 创建测试卡密: {license_obj.license_key}")
print(f" 卡密ID: {license_obj.license_id}")
print(f" 初始状态: {license_obj.status} ({license_obj.get_status_name()})")
return license_obj
def test_activate_license(license_obj, machine_code='test-machine-001'):
"""激活测试卡密"""
with license_obj.query.session.no_autoflush:
success, message = license_obj.activate(
machine_code=machine_code,
software_version='1.0.0'
)
if success:
print(f"✅ 卡密激活成功")
print(f" 激活时间: {license_obj.activate_time}")
print(f" 过期时间: {license_obj.expire_time}")
print(f" 当前状态: {license_obj.status} ({license_obj.get_status_name()})")
return True
else:
print(f"❌ 卡密激活失败: {message}")
return False
def test_expire_license(license_obj):
"""模拟卡密过期"""
with license_obj.query.session.no_autoflush:
# 将过期时间设置为过去
license_obj.expire_time = datetime.utcnow() - timedelta(days=1)
db.session.commit()
print(f"✅ 卡密已模拟过期")
print(f" 过期时间: {license_obj.expire_time}")
print(f" is_expired(): {license_obj.is_expired()}")
print(f" 当前状态: {license_obj.status} ({license_obj.get_status_name()})")
def test_manual_check(app, license_obj):
"""手动触发过期检查"""
with app.app_context():
from app.utils.background_tasks import update_expired_licenses
print("\n🔍 手动触发过期卡密检查...")
# 检查前状态
license_obj_before = License.query.get(license_obj.license_id)
print(f" 检查前状态: {license_obj_before.status} ({license_obj_before.get_status_name()})")
# 执行检查
result = update_expired_licenses()
# 检查后状态
license_obj_after = License.query.get(license_obj.license_id)
print(f" 检查后状态: {license_obj_after.status} ({license_obj_after.get_status_name()})")
if result['success']:
print(f"✅ 检查执行成功")
print(f" 更新数量: {result['updated_count']}")
return True
else:
print(f"❌ 检查执行失败: {result['message']}")
return False
def test_scheduler_status(app):
"""测试定时任务状态"""
with app.app_context():
from app.utils.scheduler import get_job_status
print("\n📊 定时任务状态:")
status = get_job_status()
if status['running']:
print(f"✅ 调度器运行中")
print(f" 任务数量: {len(status['jobs'])}")
for job in status['jobs']:
print(f" - {job['name']}")
print(f" 下次执行: {job['next_run_time']}")
else:
print(f"⚠️ 调度器未运行")
return status['running']
def test_batch_check(app):
"""测试批量检查功能"""
with app.app_context():
from app.utils.background_tasks import check_licenses_batch
print("\n📈 批量检查所有卡密状态:")
result = check_licenses_batch()
if result['success']:
stats = result.get('statistics', {})
print(f"✅ 批量检查完成")
print(f" 已激活但过期: {stats.get('active_but_expired', 0)}")
print(f" 已过期且已标记: {stats.get('expired_and_marked', 0)}")
print(f" 已激活且有效: {stats.get('active_and_valid', 0)}")
print(f" 未激活: {stats.get('inactive', 0)}")
print(f" 已禁用: {stats.get('disabled', 0)}")
return True
else:
print(f"❌ 批量检查失败: {result['message']}")
return False
def cleanup_test_data(app, license_obj):
"""清理测试数据"""
with app.app_context():
# 删除测试卡密
db.session.delete(license_obj)
db.session.commit()
print(f"\n🧹 已清理测试数据")
def main():
"""主测试流程"""
print("=" * 60)
print("卡密过期状态自动更新功能测试")
print("=" * 60)
# 创建应用
app = create_app('testing')
with app.app_context():
db.create_all()
try:
# 测试1: 创建测试卡密
print("\n[测试 1] 创建测试卡密")
license_obj = create_test_license(app)
# 测试2: 激活卡密
print("\n[测试 2] 激活卡密")
if not test_activate_license(license_obj):
print("❌ 激活失败,退出测试")
return False
# 测试3: 模拟过期
print("\n[测试 3] 模拟卡密过期")
test_expire_license(license_obj)
# 测试4: 验证过期检查前状态
print("\n[验证] 过期检查前状态")
license_before = License.query.get(license_obj.license_id)
print(f" 卡密状态: {license_before.status} ({license_before.get_status_name()})")
print(f" is_expired(): {license_before.is_expired()}")
if license_before.status != 1:
print("⚠️ 卡密状态不是已激活,无法继续测试")
return False
# 测试5: 手动触发过期检查
print("\n[测试 4] 手动触发过期检查")
if not test_manual_check(app, license_obj):
print("❌ 手动检查失败")
return False
# 测试6: 验证检查后状态
print("\n[验证] 检查后状态")
license_after = License.query.get(license_obj.license_id)
print(f" 卡密状态: {license_after.status} ({license_after.get_status_name()})")
if license_after.status == 2:
print("✅ 卡密状态已正确更新为已过期")
else:
print(f"❌ 卡密状态未更新期望2已过期实际{license_after.status}")
return False
# 测试7: 测试批量检查
print("\n[测试 5] 测试批量检查功能")
test_batch_check(app)
# 测试8: 测试定时任务状态
print("\n[测试 6] 测试定时任务状态")
test_scheduler_status(app)
# 清理测试数据
cleanup_test_data(app, license_obj)
print("\n" + "=" * 60)
print("✅ 所有测试通过!过期卡密自动更新功能正常工作")
print("=" * 60)
return True
except Exception as e:
print(f"\n❌ 测试过程中发生错误: {str(e)}")
import traceback
traceback.print_exc()
return False
if __name__ == '__main__':
success = main()
sys.exit(0 if success else 1)