Kamixitong/app/api/statistics.py

174 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import request, jsonify, current_app
from datetime import datetime, timedelta, time
from sqlalchemy import func, and_, not_, text, case
from app import db
from app.models import Product, License, Device, Version, Ticket
from . import api_bp
from .decorators import require_login, require_admin
@api_bp.route('/statistics/overview', methods=['GET'])
@require_login
def get_overview_stats():
"""获取总览统计"""
try:
# 优化:使用单个查询获取所有产品统计
product_stats = db.session.query(
func.count(Product.product_id).label('total'),
func.sum(case((Product.status == 1, 1), else_=0)).label('active')
).first()
total_products = product_stats.total or 0
active_products = product_stats.active or 0
# 优化:使用单个查询获取所有卡密统计
license_stats = db.session.query(
func.count(License.license_id).label('total'),
func.sum(case((License.status == 1, 1), else_=0)).label('active'),
func.sum(case((License.type == 0, 1), else_=0)).label('trial')
).first()
total_licenses = license_stats.total or 0
active_licenses = license_stats.active or 0
trial_licenses = license_stats.trial or 0
# 优化:使用单个查询获取设备统计
device_stats = db.session.query(
func.count(Device.device_id).label('total')
).first()
total_devices = device_stats.total or 0
# 在线设备统计7天内验证过
start_date = datetime.utcnow() - timedelta(days=7)
online_devices = db.session.execute(
text("SELECT COUNT(*) FROM device WHERE last_verify_time IS NOT NULL AND last_verify_time >= :start_date"),
{"start_date": start_date}
).scalar() or 0
# 优化:使用单个查询获取激活统计
today = datetime.utcnow().date()
today_start = datetime.combine(today, time.min)
today_end = datetime.combine(today, time.max)
# 今日激活数使用日期范围查询兼容SQLite
today_activations = db.session.query(func.count(License.license_id)).filter(
License.activate_time.isnot(None),
License.activate_time >= today_start,
License.activate_time <= today_end
).scalar() or 0
# 本周激活数
week_activations = db.session.query(func.count(License.license_id)).filter(
License.activate_time.isnot(None),
License.activate_time >= start_date
).scalar() or 0
# 工单统计
total_tickets = db.session.query(func.count(Ticket.ticket_id)).scalar() or 0
return jsonify({
'success': True,
'data': {
'products': {
'total': total_products,
'active': active_products
},
'licenses': {
'total': total_licenses,
'active': active_licenses,
'trial': trial_licenses,
'formal': total_licenses - trial_licenses
},
'devices': {
'total': total_devices,
'online': online_devices,
'offline': total_devices - online_devices
},
'tickets': {
'total': total_tickets
},
'activations': {
'today': today_activations,
'week': week_activations
}
}
})
except Exception as e:
current_app.logger.error(f"获取总览统计失败: {str(e)}")
return jsonify({'success': False, 'message': '服务器内部错误'}), 500
@api_bp.route('/statistics/activations', methods=['GET'])
@require_login
def get_activation_trend():
"""获取激活趋势"""
try:
days = request.args.get('days', 30, type=int)
days = min(days, 365) # 最多查询一年
start_date = datetime.utcnow() - timedelta(days=days)
# 按日期分组统计激活数
result = db.session.execute(
text("SELECT DATE(activate_time) as date, COUNT(license_id) as count FROM license WHERE activate_time IS NOT NULL AND activate_time >= :start_date GROUP BY DATE(activate_time) ORDER BY date"),
{"start_date": start_date}
).fetchall()
# 填充缺失的日期
activation_data = []
current_date = start_date.date()
end_date = datetime.utcnow().date()
result_dict = {str(r.date): r.count for r in result}
while current_date <= end_date:
activation_data.append({
'date': str(current_date),
'count': result_dict.get(str(current_date), 0)
})
current_date += timedelta(days=1)
return jsonify({
'success': True,
'data': {
'activations': activation_data,
'period': f'{days}'
}
})
except Exception as e:
current_app.logger.error(f"获取激活趋势失败: {str(e)}")
return jsonify({'success': False, 'message': '服务器内部错误'}), 500
@api_bp.route('/statistics/products', methods=['GET'])
@require_login
def get_product_stats():
"""获取产品统计信息"""
try:
# 获取产品分布数据
product_stats = db.session.query(
Product.product_name,
func.count(License.license_id).label('license_count'),
func.count(Device.device_id).label('device_count')
).outerjoin(License, Product.product_id == License.product_id)\
.outerjoin(Device, Product.product_id == Device.product_id)\
.group_by(Product.product_id, Product.product_name)\
.order_by(func.count(License.license_id).desc())\
.all()
# 转换为字典列表
products_data = []
for product in product_stats:
products_data.append({
'product_name': product.product_name,
'license_count': product.license_count or 0,
'device_count': product.device_count or 0
})
return jsonify({
'success': True,
'data': {
'products': products_data
}
})
except Exception as e:
current_app.logger.error(f"获取产品统计失败: {str(e)}")
return jsonify({'success': False, 'message': '服务器内部错误'}), 500