from flask import request, jsonify, current_app from datetime import datetime, timedelta, time from sqlalchemy import func, and_, not_, text, case from app import db from app.models import Product, License, Device, Version, Ticket from . import api_bp from .decorators import require_login, require_admin @api_bp.route('/statistics/overview', methods=['GET']) @require_login def get_overview_stats(): """获取总览统计""" try: # 优化:使用单个查询获取所有产品统计 product_stats = db.session.query( func.count(Product.product_id).label('total'), func.sum(case((Product.status == 1, 1), else_=0)).label('active') ).first() total_products = product_stats.total or 0 active_products = product_stats.active or 0 # 优化:使用单个查询获取所有卡密统计 license_stats = db.session.query( func.count(License.license_id).label('total'), func.sum(case((License.status == 1, 1), else_=0)).label('active'), func.sum(case((License.type == 0, 1), else_=0)).label('trial') ).first() total_licenses = license_stats.total or 0 active_licenses = license_stats.active or 0 trial_licenses = license_stats.trial or 0 # 优化:使用单个查询获取设备统计 device_stats = db.session.query( func.count(Device.device_id).label('total') ).first() total_devices = device_stats.total or 0 # 在线设备统计(7天内验证过) start_date = datetime.utcnow() - timedelta(days=7) online_devices = db.session.execute( text("SELECT COUNT(*) FROM device WHERE last_verify_time IS NOT NULL AND last_verify_time >= :start_date"), {"start_date": start_date} ).scalar() or 0 # 优化:使用单个查询获取激活统计 today = datetime.utcnow().date() today_start = datetime.combine(today, time.min) today_end = datetime.combine(today, time.max) # 今日激活数(使用日期范围查询,兼容SQLite) today_activations = db.session.query(func.count(License.license_id)).filter( License.activate_time.isnot(None), License.activate_time >= today_start, License.activate_time <= today_end ).scalar() or 0 # 本周激活数 week_activations = db.session.query(func.count(License.license_id)).filter( License.activate_time.isnot(None), License.activate_time >= start_date ).scalar() or 0 # 工单统计 total_tickets = db.session.query(func.count(Ticket.ticket_id)).scalar() or 0 return jsonify({ 'success': True, 'data': { 'products': { 'total': total_products, 'active': active_products }, 'licenses': { 'total': total_licenses, 'active': active_licenses, 'trial': trial_licenses, 'formal': total_licenses - trial_licenses }, 'devices': { 'total': total_devices, 'online': online_devices, 'offline': total_devices - online_devices }, 'tickets': { 'total': total_tickets }, 'activations': { 'today': today_activations, 'week': week_activations } } }) except Exception as e: current_app.logger.error(f"获取总览统计失败: {str(e)}") return jsonify({'success': False, 'message': '服务器内部错误'}), 500 @api_bp.route('/statistics/activations', methods=['GET']) @require_login def get_activation_trend(): """获取激活趋势""" try: days = request.args.get('days', 30, type=int) days = min(days, 365) # 最多查询一年 start_date = datetime.utcnow() - timedelta(days=days) # 按日期分组统计激活数 result = db.session.execute( text("SELECT DATE(activate_time) as date, COUNT(license_id) as count FROM license WHERE activate_time IS NOT NULL AND activate_time >= :start_date GROUP BY DATE(activate_time) ORDER BY date"), {"start_date": start_date} ).fetchall() # 填充缺失的日期 activation_data = [] current_date = start_date.date() end_date = datetime.utcnow().date() result_dict = {str(r.date): r.count for r in result} while current_date <= end_date: activation_data.append({ 'date': str(current_date), 'count': result_dict.get(str(current_date), 0) }) current_date += timedelta(days=1) return jsonify({ 'success': True, 'data': { 'activations': activation_data, 'period': f'{days}天' } }) except Exception as e: current_app.logger.error(f"获取激活趋势失败: {str(e)}") return jsonify({'success': False, 'message': '服务器内部错误'}), 500 @api_bp.route('/statistics/products', methods=['GET']) @require_login def get_product_stats(): """获取产品统计信息""" try: # 获取产品分布数据 product_stats = db.session.query( Product.product_name, func.count(License.license_id).label('license_count'), func.count(Device.device_id).label('device_count') ).outerjoin(License, Product.product_id == License.product_id)\ .outerjoin(Device, Product.product_id == Device.product_id)\ .group_by(Product.product_id, Product.product_name)\ .order_by(func.count(License.license_id).desc())\ .all() # 转换为字典列表 products_data = [] for product in product_stats: products_data.append({ 'product_name': product.product_name, 'license_count': product.license_count or 0, 'device_count': product.device_count or 0 }) return jsonify({ 'success': True, 'data': { 'products': products_data } }) except Exception as e: current_app.logger.error(f"获取产品统计失败: {str(e)}") return jsonify({'success': False, 'message': '服务器内部错误'}), 500