from flask import request, jsonify, current_app from datetime import datetime, timedelta from sqlalchemy import func, and_, not_, text from app import db from app.models import Product, License, Device, Version, Ticket from . import api_bp from .license import require_admin @api_bp.route('/statistics/overview', methods=['GET']) @require_admin def get_overview_stats(): """获取总览统计""" try: # 产品统计 total_products = Product.query.count() active_products = Product.query.filter_by(status=1).count() # 卡密统计 total_licenses = License.query.count() active_licenses = License.query.filter_by(status=1).count() trial_licenses = License.query.filter_by(type=0).count() # 设备统计 total_devices = Device.query.count() online_devices = db.session.execute( text("SELECT COUNT(*) FROM device WHERE last_verify_time IS NOT NULL AND last_verify_time >= :start_date"), {"start_date": datetime.utcnow() - timedelta(days=7)} ).scalar() # 近期激活统计 today_activations = License.query.filter( func.date(License.activate_time) == datetime.utcnow().date() ).count() week_activations = db.session.execute( text("SELECT COUNT(*) FROM license WHERE activate_time IS NOT NULL AND activate_time >= :start_date"), {"start_date": datetime.utcnow() - timedelta(days=7)} ).scalar() # 工单统计 total_tickets = Ticket.query.count() return jsonify({ 'success': True, 'data': { 'products': { 'total': total_products, 'active': active_products }, 'licenses': { 'total': total_licenses, 'active': active_licenses, 'trial': trial_licenses, 'formal': total_licenses - trial_licenses }, 'devices': { 'total': total_devices, 'online': online_devices, 'offline': total_devices - online_devices }, 'tickets': { 'total': total_tickets }, 'activations': { 'today': today_activations, 'week': week_activations } } }) except Exception as e: current_app.logger.error(f"获取总览统计失败: {str(e)}") return jsonify({'success': False, 'message': '服务器内部错误'}), 500 @api_bp.route('/statistics/activations', methods=['GET']) @require_admin def get_activation_trend(): """获取激活趋势""" try: days = request.args.get('days', 30, type=int) days = min(days, 365) # 最多查询一年 start_date = datetime.utcnow() - timedelta(days=days) # 按日期分组统计激活数 result = db.session.execute( text("SELECT DATE(activate_time) as date, COUNT(license_id) as count FROM license WHERE activate_time IS NOT NULL AND activate_time >= :start_date GROUP BY DATE(activate_time) ORDER BY date"), {"start_date": start_date} ).fetchall() # 填充缺失的日期 activation_data = [] current_date = start_date.date() end_date = datetime.utcnow().date() result_dict = {str(r.date): r.count for r in result} while current_date <= end_date: activation_data.append({ 'date': str(current_date), 'count': result_dict.get(str(current_date), 0) }) current_date += timedelta(days=1) return jsonify({ 'success': True, 'data': { 'activations': activation_data, 'period': f'{days}天' } }) except Exception as e: current_app.logger.error(f"获取激活趋势失败: {str(e)}") return jsonify({'success': False, 'message': '服务器内部错误'}), 500 @api_bp.route('/statistics/products', methods=['GET']) @require_admin def get_product_stats(): """获取产品统计信息""" try: # 获取产品分布数据 product_stats = db.session.query( Product.product_name, func.count(License.license_id).label('license_count'), func.count(Device.device_id).label('device_count') ).outerjoin(License, Product.product_id == License.product_id)\ .outerjoin(Device, Product.product_id == Device.product_id)\ .group_by(Product.product_id, Product.product_name)\ .order_by(func.count(License.license_id).desc())\ .all() # 转换为字典列表 products_data = [] for product in product_stats: products_data.append({ 'product_name': product.product_name, 'license_count': product.license_count or 0, 'device_count': product.device_count or 0 }) return jsonify({ 'success': True, 'data': { 'products': products_data } }) except Exception as e: current_app.logger.error(f"获取产品统计失败: {str(e)}") return jsonify({'success': False, 'message': '服务器内部错误'}), 500