""" 系统健康检查模块 提供系统状态监控和健康检查功能 """ import os import psutil import time from typing import Dict, Any, List from config import settings from openai import OpenAI class HealthChecker: """系统健康检查器""" def __init__(self): self.checks = { "disk_space": self._check_disk_space, "memory_usage": self._check_memory_usage, "cpu_usage": self._check_cpu_usage, "openai_api": self._check_openai_api, "redis_connection": self._check_redis_connection, "file_permissions": self._check_file_permissions } def _check_disk_space(self) -> Dict[str, Any]: """检查磁盘空间""" try: disk = psutil.disk_usage('/') free_percent = (disk.free / disk.total) * 100 status = "healthy" if free_percent > 10 else "warning" if free_percent > 5 else "critical" return { "name": "磁盘空间", "status": status, "details": { "total": f"{disk.total / (1024**3):.2f} GB", "used": f"{disk.used / (1024**3):.2f} GB", "free": f"{disk.free / (1024**3):.2f} GB", "free_percent": f"{free_percent:.2f}%" } } except Exception as e: return { "name": "磁盘空间", "status": "error", "details": {"error": str(e)} } def _check_memory_usage(self) -> Dict[str, Any]: """检查内存使用情况""" try: memory = psutil.virtual_memory() usage_percent = memory.percent status = "healthy" if usage_percent < 80 else "warning" if usage_percent < 90 else "critical" return { "name": "内存使用", "status": status, "details": { "total": f"{memory.total / (1024**3):.2f} GB", "available": f"{memory.available / (1024**3):.2f} GB", "used": f"{memory.used / (1024**3):.2f} GB", "usage_percent": f"{usage_percent:.2f}%" } } except Exception as e: return { "name": "内存使用", "status": "error", "details": {"error": str(e)} } def _check_cpu_usage(self) -> Dict[str, Any]: """检查CPU使用情况""" try: # 获取1秒内的CPU使用率 cpu_percent = psutil.cpu_percent(interval=1) status = "healthy" if cpu_percent < 80 else "warning" if cpu_percent < 90 else "critical" return { "name": "CPU使用", "status": status, "details": { "usage_percent": f"{cpu_percent:.2f}%", "core_count": psutil.cpu_count() } } except Exception as e: return { "name": "CPU使用", "status": "error", "details": {"error": str(e)} } def _check_openai_api(self) -> Dict[str, Any]: """检查OpenAI API连接""" try: # 根据提供商选择API密钥和基础URL api_key = None api_base = None provider_name = "Unknown" if settings.EMBEDDING_PROVIDER == "openai": api_key = settings.OPENAI_API_KEY api_base = settings.OPENAI_API_BASE provider_name = "OpenAI" elif settings.EMBEDDING_PROVIDER == "anthropic": api_key = settings.ANTHROPIC_API_KEY api_base = settings.ANTHROPIC_API_BASE provider_name = "Anthropic" elif settings.EMBEDDING_PROVIDER == "qwen": api_key = settings.QWEN_API_KEY api_base = settings.QWEN_API_BASE provider_name = "通义千问" if not api_key: return { "name": f"{provider_name} API", "status": "warning", "details": {"message": f"{provider_name} API密钥未配置"} } client = OpenAI(api_key=api_key, base_url=api_base) # 尝试获取模型列表来验证连接 start_time = time.time() models = client.models.list() response_time = (time.time() - start_time) * 1000 # 转换为毫秒 status = "healthy" if response_time < 1000 else "warning" if response_time < 3000 else "critical" return { "name": f"{provider_name} API", "status": status, "details": { "response_time_ms": f"{response_time:.2f}", "models_available": len(list(models)), "provider": settings.EMBEDDING_PROVIDER, "api_base": api_base } } except Exception as e: return { "name": f"{settings.EMBEDDING_PROVIDER.upper()} API", "status": "error", "details": {"error": str(e)} } def _check_redis_connection(self) -> Dict[str, Any]: """检查Redis连接""" try: import redis # 尝试连接Redis start_time = time.time() r = redis.Redis.from_url(settings.CELERY_BROKER_URL) r.ping() response_time = (time.time() - start_time) * 1000 # 转换为毫秒 status = "healthy" if response_time < 100 else "warning" if response_time < 500 else "critical" return { "name": "Redis连接", "status": status, "details": { "response_time_ms": f"{response_time:.2f}", "broker_url": settings.CELERY_BROKER_URL } } except ImportError: return { "name": "Redis连接", "status": "warning", "details": {"message": "Redis库未安装"} } except Exception as e: return { "name": "Redis连接", "status": "error", "details": {"error": str(e)} } def _check_file_permissions(self) -> Dict[str, Any]: """检查文件权限""" try: required_paths = [ ".", "uploads", "exports", "logs" ] issues = [] for path in required_paths: if not os.path.exists(path): issues.append(f"路径不存在: {path}") elif not os.access(path, os.R_OK | os.W_OK): issues.append(f"权限不足: {path}") status = "healthy" if not issues else "warning" return { "name": "文件权限", "status": status, "details": { "issues": issues if issues else "所有路径权限正常" } } except Exception as e: return { "name": "文件权限", "status": "error", "details": {"error": str(e)} } def run_health_check(self) -> Dict[str, Any]: """运行完整的健康检查""" results = [] overall_status = "healthy" for check_name, check_func in self.checks.items(): try: result = check_func() results.append(result) # 更新整体状态(严重程度: error > critical > warning > healthy) status_priority = {"error": 4, "critical": 3, "warning": 2, "healthy": 1} if status_priority.get(result["status"], 0) > status_priority.get(overall_status, 0): overall_status = result["status"] except Exception as e: results.append({ "name": check_name, "status": "error", "details": {"error": f"检查函数执行失败: {str(e)}"} }) overall_status = "error" return { "timestamp": time.time(), "overall_status": overall_status, "checks": results } def get_system_info(self) -> Dict[str, Any]: """获取系统信息""" try: return { "platform": os.name, "python_version": ".".join(map(str, __import__('sys').version_info[:3])), "cpu_count": psutil.cpu_count(), "memory_total": f"{psutil.virtual_memory().total / (1024**3):.2f} GB", "disk_total": f"{psutil.disk_usage('/').total / (1024**3):.2f} GB" } except Exception as e: return {"error": str(e)} # 创建全局健康检查器实例 health_checker = HealthChecker()