""" 系统健康检查端点 提供系统状态监控和健康检查功能 """ from flask import Blueprint, jsonify, current_app from datetime import datetime from app import db from app.models import Product, License, Device, Admin from sqlalchemy import text import os import psutil import platform health_bp = Blueprint('health', __name__) @health_bp.route('/health', methods=['GET']) def health_check(): """ 基本健康检查 """ return jsonify({ 'status': 'healthy', 'timestamp': datetime.utcnow().isoformat(), 'service': 'KaMiXiTong' }), 200 @health_bp.route('/health/detailed', methods=['GET']) def detailed_health_check(): """ 详细健康检查 检查数据库连接、磁盘空间、内存使用等 """ health_status = { 'status': 'healthy', 'timestamp': datetime.utcnow().isoformat(), 'service': 'KaMiXiTong', 'checks': {} } overall_healthy = True # 1. 数据库连接检查 try: db.session.execute(text('SELECT 1')) health_status['checks']['database'] = { 'status': 'healthy', 'message': '数据库连接正常' } except Exception as e: overall_healthy = False health_status['checks']['database'] = { 'status': 'unhealthy', 'message': f'数据库连接失败: {str(e)}' } # 2. 磁盘空间检查 try: disk_usage = psutil.disk_usage('/') disk_percent = disk_usage.percent if disk_percent > 90: overall_healthy = False disk_status = 'unhealthy' elif disk_percent > 80: disk_status = 'warning' else: disk_status = 'healthy' health_status['checks']['disk'] = { 'status': disk_status, 'usage_percent': disk_percent, 'free_gb': round(disk_usage.free / (1024**3), 2), 'total_gb': round(disk_usage.total / (1024**3), 2), 'message': f'磁盘使用率: {disk_percent}%' } except Exception as e: health_status['checks']['disk'] = { 'status': 'unknown', 'message': f'无法获取磁盘信息: {str(e)}' } # 3. 内存使用检查 try: memory = psutil.virtual_memory() memory_percent = memory.percent if memory_percent > 90: overall_healthy = False memory_status = 'unhealthy' elif memory_percent > 80: memory_status = 'warning' else: memory_status = 'healthy' health_status['checks']['memory'] = { 'status': memory_status, 'usage_percent': memory_percent, 'available_gb': round(memory.available / (1024**3), 2), 'total_gb': round(memory.total / (1024**3), 2), 'message': f'内存使用率: {memory_percent}%' } except Exception as e: health_status['checks']['memory'] = { 'status': 'unknown', 'message': f'无法获取内存信息: {str(e)}' } # 4. 数据统计 try: stats = { 'products': Product.query.count(), 'licenses': License.query.count(), 'devices': Device.query.count(), 'admins': Admin.query.count() } health_status['checks']['statistics'] = { 'status': 'healthy', 'data': stats } except Exception as e: overall_healthy = False health_status['checks']['statistics'] = { 'status': 'unhealthy', 'message': f'统计查询失败: {str(e)}' } # 5. 系统信息 health_status['system'] = { 'os': platform.system(), 'os_version': platform.version(), 'python_version': platform.python_version(), 'hostname': platform.node(), 'process_id': os.getpid() } # 设置整体状态 health_status['status'] = 'healthy' if overall_healthy else 'unhealthy' status_code = 200 if overall_healthy else 503 return jsonify(health_status), status_code @health_bp.route('/health/ready', methods=['GET']) def readiness_check(): """ 就绪检查 - 检查应用是否准备好接收流量 """ try: # 检查数据库连接 db.session.execute(text('SELECT 1')) # 检查关键配置 required_config = ['SECRET_KEY', 'AUTH_SECRET_KEY', 'SQLALCHEMY_DATABASE_URI'] missing_config = [] for config_key in required_config: if not current_app.config.get(config_key): missing_config.append(config_key) if missing_config: return jsonify({ 'status': 'not_ready', 'message': f'缺少配置: {", ".join(missing_config)}' }), 503 return jsonify({ 'status': 'ready', 'message': '应用已就绪' }), 200 except Exception as e: return jsonify({ 'status': 'not_ready', 'message': f'就绪检查失败: {str(e)}' }), 503 @health_bp.route('/health/live', methods=['GET']) def liveness_check(): """ 存活检查 - 检查应用是否还在运行 """ return jsonify({ 'status': 'alive', 'timestamp': datetime.utcnow().isoformat() }), 200