nodebookls/health_check.py

262 lines
9.2 KiB
Python
Raw Permalink Normal View History

2025-10-29 13:56:24 +08:00
"""
系统健康检查模块
提供系统状态监控和健康检查功能
"""
import os
import psutil
import time
from typing import Dict, Any, List
from config import settings
from openai import OpenAI
class HealthChecker:
"""系统健康检查器"""
def __init__(self):
self.checks = {
"disk_space": self._check_disk_space,
"memory_usage": self._check_memory_usage,
"cpu_usage": self._check_cpu_usage,
"openai_api": self._check_openai_api,
"redis_connection": self._check_redis_connection,
"file_permissions": self._check_file_permissions
}
def _check_disk_space(self) -> Dict[str, Any]:
"""检查磁盘空间"""
try:
disk = psutil.disk_usage('/')
free_percent = (disk.free / disk.total) * 100
status = "healthy" if free_percent > 10 else "warning" if free_percent > 5 else "critical"
return {
"name": "磁盘空间",
"status": status,
"details": {
"total": f"{disk.total / (1024**3):.2f} GB",
"used": f"{disk.used / (1024**3):.2f} GB",
"free": f"{disk.free / (1024**3):.2f} GB",
"free_percent": f"{free_percent:.2f}%"
}
}
except Exception as e:
return {
"name": "磁盘空间",
"status": "error",
"details": {"error": str(e)}
}
def _check_memory_usage(self) -> Dict[str, Any]:
"""检查内存使用情况"""
try:
memory = psutil.virtual_memory()
usage_percent = memory.percent
status = "healthy" if usage_percent < 80 else "warning" if usage_percent < 90 else "critical"
return {
"name": "内存使用",
"status": status,
"details": {
"total": f"{memory.total / (1024**3):.2f} GB",
"available": f"{memory.available / (1024**3):.2f} GB",
"used": f"{memory.used / (1024**3):.2f} GB",
"usage_percent": f"{usage_percent:.2f}%"
}
}
except Exception as e:
return {
"name": "内存使用",
"status": "error",
"details": {"error": str(e)}
}
def _check_cpu_usage(self) -> Dict[str, Any]:
"""检查CPU使用情况"""
try:
# 获取1秒内的CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
status = "healthy" if cpu_percent < 80 else "warning" if cpu_percent < 90 else "critical"
return {
"name": "CPU使用",
"status": status,
"details": {
"usage_percent": f"{cpu_percent:.2f}%",
"core_count": psutil.cpu_count()
}
}
except Exception as e:
return {
"name": "CPU使用",
"status": "error",
"details": {"error": str(e)}
}
def _check_openai_api(self) -> Dict[str, Any]:
"""检查OpenAI API连接"""
try:
# 根据提供商选择API密钥和基础URL
api_key = None
api_base = None
provider_name = "Unknown"
if settings.EMBEDDING_PROVIDER == "openai":
api_key = settings.OPENAI_API_KEY
api_base = settings.OPENAI_API_BASE
provider_name = "OpenAI"
elif settings.EMBEDDING_PROVIDER == "anthropic":
api_key = settings.ANTHROPIC_API_KEY
api_base = settings.ANTHROPIC_API_BASE
provider_name = "Anthropic"
elif settings.EMBEDDING_PROVIDER == "qwen":
api_key = settings.QWEN_API_KEY
api_base = settings.QWEN_API_BASE
provider_name = "通义千问"
if not api_key:
return {
"name": f"{provider_name} API",
"status": "warning",
"details": {"message": f"{provider_name} API密钥未配置"}
}
client = OpenAI(api_key=api_key, base_url=api_base)
# 尝试获取模型列表来验证连接
start_time = time.time()
models = client.models.list()
response_time = (time.time() - start_time) * 1000 # 转换为毫秒
status = "healthy" if response_time < 1000 else "warning" if response_time < 3000 else "critical"
return {
"name": f"{provider_name} API",
"status": status,
"details": {
"response_time_ms": f"{response_time:.2f}",
"models_available": len(list(models)),
"provider": settings.EMBEDDING_PROVIDER,
"api_base": api_base
}
}
except Exception as e:
return {
"name": f"{settings.EMBEDDING_PROVIDER.upper()} API",
"status": "error",
"details": {"error": str(e)}
}
def _check_redis_connection(self) -> Dict[str, Any]:
"""检查Redis连接"""
try:
import redis
# 尝试连接Redis
start_time = time.time()
r = redis.Redis.from_url(settings.CELERY_BROKER_URL)
r.ping()
response_time = (time.time() - start_time) * 1000 # 转换为毫秒
status = "healthy" if response_time < 100 else "warning" if response_time < 500 else "critical"
return {
"name": "Redis连接",
"status": status,
"details": {
"response_time_ms": f"{response_time:.2f}",
"broker_url": settings.CELERY_BROKER_URL
}
}
except ImportError:
return {
"name": "Redis连接",
"status": "warning",
"details": {"message": "Redis库未安装"}
}
except Exception as e:
return {
"name": "Redis连接",
"status": "error",
"details": {"error": str(e)}
}
def _check_file_permissions(self) -> Dict[str, Any]:
"""检查文件权限"""
try:
required_paths = [
".",
"uploads",
"exports",
"logs"
]
issues = []
for path in required_paths:
if not os.path.exists(path):
issues.append(f"路径不存在: {path}")
elif not os.access(path, os.R_OK | os.W_OK):
issues.append(f"权限不足: {path}")
status = "healthy" if not issues else "warning"
return {
"name": "文件权限",
"status": status,
"details": {
"issues": issues if issues else "所有路径权限正常"
}
}
except Exception as e:
return {
"name": "文件权限",
"status": "error",
"details": {"error": str(e)}
}
def run_health_check(self) -> Dict[str, Any]:
"""运行完整的健康检查"""
results = []
overall_status = "healthy"
for check_name, check_func in self.checks.items():
try:
result = check_func()
results.append(result)
# 更新整体状态(严重程度: error > critical > warning > healthy
status_priority = {"error": 4, "critical": 3, "warning": 2, "healthy": 1}
if status_priority.get(result["status"], 0) > status_priority.get(overall_status, 0):
overall_status = result["status"]
except Exception as e:
results.append({
"name": check_name,
"status": "error",
"details": {"error": f"检查函数执行失败: {str(e)}"}
})
overall_status = "error"
return {
"timestamp": time.time(),
"overall_status": overall_status,
"checks": results
}
def get_system_info(self) -> Dict[str, Any]:
"""获取系统信息"""
try:
return {
"platform": os.name,
"python_version": ".".join(map(str, __import__('sys').version_info[:3])),
"cpu_count": psutil.cpu_count(),
"memory_total": f"{psutil.virtual_memory().total / (1024**3):.2f} GB",
"disk_total": f"{psutil.disk_usage('/').total / (1024**3):.2f} GB"
}
except Exception as e:
return {"error": str(e)}
# 创建全局健康检查器实例
health_checker = HealthChecker()