#!/usr/bin/env python # -*- coding: utf-8 -*- """ 卡密过期状态更新功能测试脚本 此脚本用于验证过期卡密自动更新功能是否正常工作 """ import sys import os from datetime import datetime, timedelta # 添加项目路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from app import create_app, db from app.models import License, Product def create_test_license(app): """创建测试卡密""" with app.app_context(): # 查找或创建测试产品 product = Product.query.filter_by(product_id='test-product').first() if not product: product = Product( product_id='test-product', product_name='测试产品', status=1 ) db.session.add(product) db.session.flush() # 创建测试卡密(1天有效期) license_obj = License( product_id='test-product', type=1, # 正式卡密 valid_days=1, status=0 # 未激活 ) db.session.add(license_obj) db.session.commit() print(f"✅ 创建测试卡密: {license_obj.license_key}") print(f" 卡密ID: {license_obj.license_id}") print(f" 初始状态: {license_obj.status} ({license_obj.get_status_name()})") return license_obj def test_activate_license(license_obj, machine_code='test-machine-001'): """激活测试卡密""" with license_obj.query.session.no_autoflush: success, message = license_obj.activate( machine_code=machine_code, software_version='1.0.0' ) if success: print(f"✅ 卡密激活成功") print(f" 激活时间: {license_obj.activate_time}") print(f" 过期时间: {license_obj.expire_time}") print(f" 当前状态: {license_obj.status} ({license_obj.get_status_name()})") return True else: print(f"❌ 卡密激活失败: {message}") return False def test_expire_license(license_obj): """模拟卡密过期""" with license_obj.query.session.no_autoflush: # 将过期时间设置为过去 license_obj.expire_time = datetime.utcnow() - timedelta(days=1) db.session.commit() print(f"✅ 卡密已模拟过期") print(f" 过期时间: {license_obj.expire_time}") print(f" is_expired(): {license_obj.is_expired()}") print(f" 当前状态: {license_obj.status} ({license_obj.get_status_name()})") def test_manual_check(app, license_obj): """手动触发过期检查""" with app.app_context(): from app.utils.background_tasks import update_expired_licenses print("\n🔍 手动触发过期卡密检查...") # 检查前状态 license_obj_before = License.query.get(license_obj.license_id) print(f" 检查前状态: {license_obj_before.status} ({license_obj_before.get_status_name()})") # 执行检查 result = update_expired_licenses() # 检查后状态 license_obj_after = License.query.get(license_obj.license_id) print(f" 检查后状态: {license_obj_after.status} ({license_obj_after.get_status_name()})") if result['success']: print(f"✅ 检查执行成功") print(f" 更新数量: {result['updated_count']}") return True else: print(f"❌ 检查执行失败: {result['message']}") return False def test_scheduler_status(app): """测试定时任务状态""" with app.app_context(): from app.utils.scheduler import get_job_status print("\n📊 定时任务状态:") status = get_job_status() if status['running']: print(f"✅ 调度器运行中") print(f" 任务数量: {len(status['jobs'])}") for job in status['jobs']: print(f" - {job['name']}") print(f" 下次执行: {job['next_run_time']}") else: print(f"⚠️ 调度器未运行") return status['running'] def test_batch_check(app): """测试批量检查功能""" with app.app_context(): from app.utils.background_tasks import check_licenses_batch print("\n📈 批量检查所有卡密状态:") result = check_licenses_batch() if result['success']: stats = result.get('statistics', {}) print(f"✅ 批量检查完成") print(f" 已激活但过期: {stats.get('active_but_expired', 0)}") print(f" 已过期且已标记: {stats.get('expired_and_marked', 0)}") print(f" 已激活且有效: {stats.get('active_and_valid', 0)}") print(f" 未激活: {stats.get('inactive', 0)}") print(f" 已禁用: {stats.get('disabled', 0)}") return True else: print(f"❌ 批量检查失败: {result['message']}") return False def cleanup_test_data(app, license_obj): """清理测试数据""" with app.app_context(): # 删除测试卡密 db.session.delete(license_obj) db.session.commit() print(f"\n🧹 已清理测试数据") def main(): """主测试流程""" print("=" * 60) print("卡密过期状态自动更新功能测试") print("=" * 60) # 创建应用 app = create_app('testing') with app.app_context(): db.create_all() try: # 测试1: 创建测试卡密 print("\n[测试 1] 创建测试卡密") license_obj = create_test_license(app) # 测试2: 激活卡密 print("\n[测试 2] 激活卡密") if not test_activate_license(license_obj): print("❌ 激活失败,退出测试") return False # 测试3: 模拟过期 print("\n[测试 3] 模拟卡密过期") test_expire_license(license_obj) # 测试4: 验证过期检查前状态 print("\n[验证] 过期检查前状态") license_before = License.query.get(license_obj.license_id) print(f" 卡密状态: {license_before.status} ({license_before.get_status_name()})") print(f" is_expired(): {license_before.is_expired()}") if license_before.status != 1: print("⚠️ 卡密状态不是已激活,无法继续测试") return False # 测试5: 手动触发过期检查 print("\n[测试 4] 手动触发过期检查") if not test_manual_check(app, license_obj): print("❌ 手动检查失败") return False # 测试6: 验证检查后状态 print("\n[验证] 检查后状态") license_after = License.query.get(license_obj.license_id) print(f" 卡密状态: {license_after.status} ({license_after.get_status_name()})") if license_after.status == 2: print("✅ 卡密状态已正确更新为已过期") else: print(f"❌ 卡密状态未更新,期望2(已过期),实际{license_after.status}") return False # 测试7: 测试批量检查 print("\n[测试 5] 测试批量检查功能") test_batch_check(app) # 测试8: 测试定时任务状态 print("\n[测试 6] 测试定时任务状态") test_scheduler_status(app) # 清理测试数据 cleanup_test_data(app, license_obj) print("\n" + "=" * 60) print("✅ 所有测试通过!过期卡密自动更新功能正常工作") print("=" * 60) return True except Exception as e: print(f"\n❌ 测试过程中发生错误: {str(e)}") import traceback traceback.print_exc() return False if __name__ == '__main__': success = main() sys.exit(0 if success else 1)