Kamixitong/app/api/statistics.py

152 lines
5.3 KiB
Python
Raw Normal View History

2025-11-11 21:39:12 +08:00
from flask import request, jsonify, current_app
from datetime import datetime, timedelta
from sqlalchemy import func, and_, not_, text
from app import db
from app.models import Product, License, Device, Version, Ticket
from . import api_bp
from .license import require_admin
@api_bp.route('/statistics/overview', methods=['GET'])
@require_admin
def get_overview_stats():
"""获取总览统计"""
try:
# 产品统计
total_products = Product.query.count()
active_products = Product.query.filter_by(status=1).count()
# 卡密统计
total_licenses = License.query.count()
active_licenses = License.query.filter_by(status=1).count()
trial_licenses = License.query.filter_by(type=0).count()
# 设备统计
total_devices = Device.query.count()
online_devices = db.session.execute(
text("SELECT COUNT(*) FROM device WHERE last_verify_time IS NOT NULL AND last_verify_time >= :start_date"),
{"start_date": datetime.utcnow() - timedelta(days=7)}
).scalar()
# 近期激活统计
today_activations = License.query.filter(
func.date(License.activate_time) == datetime.utcnow().date()
).count()
week_activations = db.session.execute(
text("SELECT COUNT(*) FROM license WHERE activate_time IS NOT NULL AND activate_time >= :start_date"),
{"start_date": datetime.utcnow() - timedelta(days=7)}
).scalar()
# 工单统计
total_tickets = Ticket.query.count()
return jsonify({
'success': True,
'data': {
'products': {
'total': total_products,
'active': active_products
},
'licenses': {
'total': total_licenses,
'active': active_licenses,
'trial': trial_licenses,
'formal': total_licenses - trial_licenses
},
'devices': {
'total': total_devices,
'online': online_devices,
'offline': total_devices - online_devices
},
'tickets': {
'total': total_tickets
},
'activations': {
'today': today_activations,
'week': week_activations
}
}
})
except Exception as e:
current_app.logger.error(f"获取总览统计失败: {str(e)}")
return jsonify({'success': False, 'message': '服务器内部错误'}), 500
@api_bp.route('/statistics/activations', methods=['GET'])
@require_admin
def get_activation_trend():
"""获取激活趋势"""
try:
days = request.args.get('days', 30, type=int)
days = min(days, 365) # 最多查询一年
start_date = datetime.utcnow() - timedelta(days=days)
# 按日期分组统计激活数
result = db.session.execute(
text("SELECT DATE(activate_time) as date, COUNT(license_id) as count FROM license WHERE activate_time IS NOT NULL AND activate_time >= :start_date GROUP BY DATE(activate_time) ORDER BY date"),
{"start_date": start_date}
).fetchall()
# 填充缺失的日期
activation_data = []
current_date = start_date.date()
end_date = datetime.utcnow().date()
result_dict = {str(r.date): r.count for r in result}
while current_date <= end_date:
activation_data.append({
'date': str(current_date),
'count': result_dict.get(str(current_date), 0)
})
current_date += timedelta(days=1)
return jsonify({
'success': True,
'data': {
'activations': activation_data,
'period': f'{days}'
}
})
except Exception as e:
current_app.logger.error(f"获取激活趋势失败: {str(e)}")
return jsonify({'success': False, 'message': '服务器内部错误'}), 500
@api_bp.route('/statistics/products', methods=['GET'])
@require_admin
def get_product_stats():
"""获取产品统计信息"""
try:
# 获取产品分布数据
product_stats = db.session.query(
Product.product_name,
func.count(License.license_id).label('license_count'),
func.count(Device.device_id).label('device_count')
).outerjoin(License, Product.product_id == License.product_id)\
.outerjoin(Device, Product.product_id == Device.product_id)\
.group_by(Product.product_id, Product.product_name)\
.order_by(func.count(License.license_id).desc())\
.all()
# 转换为字典列表
products_data = []
for product in product_stats:
products_data.append({
'product_name': product.product_name,
'license_count': product.license_count or 0,
'device_count': product.device_count or 0
})
return jsonify({
'success': True,
'data': {
'products': products_data
}
})
except Exception as e:
current_app.logger.error(f"获取产品统计失败: {str(e)}")
return jsonify({'success': False, 'message': '服务器内部错误'}), 500