nodebookls/health_check.py
2025-10-29 13:56:24 +08:00

262 lines
9.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
系统健康检查模块
提供系统状态监控和健康检查功能
"""
import os
import psutil
import time
from typing import Dict, Any, List
from config import settings
from openai import OpenAI
class HealthChecker:
"""系统健康检查器"""
def __init__(self):
self.checks = {
"disk_space": self._check_disk_space,
"memory_usage": self._check_memory_usage,
"cpu_usage": self._check_cpu_usage,
"openai_api": self._check_openai_api,
"redis_connection": self._check_redis_connection,
"file_permissions": self._check_file_permissions
}
def _check_disk_space(self) -> Dict[str, Any]:
"""检查磁盘空间"""
try:
disk = psutil.disk_usage('/')
free_percent = (disk.free / disk.total) * 100
status = "healthy" if free_percent > 10 else "warning" if free_percent > 5 else "critical"
return {
"name": "磁盘空间",
"status": status,
"details": {
"total": f"{disk.total / (1024**3):.2f} GB",
"used": f"{disk.used / (1024**3):.2f} GB",
"free": f"{disk.free / (1024**3):.2f} GB",
"free_percent": f"{free_percent:.2f}%"
}
}
except Exception as e:
return {
"name": "磁盘空间",
"status": "error",
"details": {"error": str(e)}
}
def _check_memory_usage(self) -> Dict[str, Any]:
"""检查内存使用情况"""
try:
memory = psutil.virtual_memory()
usage_percent = memory.percent
status = "healthy" if usage_percent < 80 else "warning" if usage_percent < 90 else "critical"
return {
"name": "内存使用",
"status": status,
"details": {
"total": f"{memory.total / (1024**3):.2f} GB",
"available": f"{memory.available / (1024**3):.2f} GB",
"used": f"{memory.used / (1024**3):.2f} GB",
"usage_percent": f"{usage_percent:.2f}%"
}
}
except Exception as e:
return {
"name": "内存使用",
"status": "error",
"details": {"error": str(e)}
}
def _check_cpu_usage(self) -> Dict[str, Any]:
"""检查CPU使用情况"""
try:
# 获取1秒内的CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
status = "healthy" if cpu_percent < 80 else "warning" if cpu_percent < 90 else "critical"
return {
"name": "CPU使用",
"status": status,
"details": {
"usage_percent": f"{cpu_percent:.2f}%",
"core_count": psutil.cpu_count()
}
}
except Exception as e:
return {
"name": "CPU使用",
"status": "error",
"details": {"error": str(e)}
}
def _check_openai_api(self) -> Dict[str, Any]:
"""检查OpenAI API连接"""
try:
# 根据提供商选择API密钥和基础URL
api_key = None
api_base = None
provider_name = "Unknown"
if settings.EMBEDDING_PROVIDER == "openai":
api_key = settings.OPENAI_API_KEY
api_base = settings.OPENAI_API_BASE
provider_name = "OpenAI"
elif settings.EMBEDDING_PROVIDER == "anthropic":
api_key = settings.ANTHROPIC_API_KEY
api_base = settings.ANTHROPIC_API_BASE
provider_name = "Anthropic"
elif settings.EMBEDDING_PROVIDER == "qwen":
api_key = settings.QWEN_API_KEY
api_base = settings.QWEN_API_BASE
provider_name = "通义千问"
if not api_key:
return {
"name": f"{provider_name} API",
"status": "warning",
"details": {"message": f"{provider_name} API密钥未配置"}
}
client = OpenAI(api_key=api_key, base_url=api_base)
# 尝试获取模型列表来验证连接
start_time = time.time()
models = client.models.list()
response_time = (time.time() - start_time) * 1000 # 转换为毫秒
status = "healthy" if response_time < 1000 else "warning" if response_time < 3000 else "critical"
return {
"name": f"{provider_name} API",
"status": status,
"details": {
"response_time_ms": f"{response_time:.2f}",
"models_available": len(list(models)),
"provider": settings.EMBEDDING_PROVIDER,
"api_base": api_base
}
}
except Exception as e:
return {
"name": f"{settings.EMBEDDING_PROVIDER.upper()} API",
"status": "error",
"details": {"error": str(e)}
}
def _check_redis_connection(self) -> Dict[str, Any]:
"""检查Redis连接"""
try:
import redis
# 尝试连接Redis
start_time = time.time()
r = redis.Redis.from_url(settings.CELERY_BROKER_URL)
r.ping()
response_time = (time.time() - start_time) * 1000 # 转换为毫秒
status = "healthy" if response_time < 100 else "warning" if response_time < 500 else "critical"
return {
"name": "Redis连接",
"status": status,
"details": {
"response_time_ms": f"{response_time:.2f}",
"broker_url": settings.CELERY_BROKER_URL
}
}
except ImportError:
return {
"name": "Redis连接",
"status": "warning",
"details": {"message": "Redis库未安装"}
}
except Exception as e:
return {
"name": "Redis连接",
"status": "error",
"details": {"error": str(e)}
}
def _check_file_permissions(self) -> Dict[str, Any]:
"""检查文件权限"""
try:
required_paths = [
".",
"uploads",
"exports",
"logs"
]
issues = []
for path in required_paths:
if not os.path.exists(path):
issues.append(f"路径不存在: {path}")
elif not os.access(path, os.R_OK | os.W_OK):
issues.append(f"权限不足: {path}")
status = "healthy" if not issues else "warning"
return {
"name": "文件权限",
"status": status,
"details": {
"issues": issues if issues else "所有路径权限正常"
}
}
except Exception as e:
return {
"name": "文件权限",
"status": "error",
"details": {"error": str(e)}
}
def run_health_check(self) -> Dict[str, Any]:
"""运行完整的健康检查"""
results = []
overall_status = "healthy"
for check_name, check_func in self.checks.items():
try:
result = check_func()
results.append(result)
# 更新整体状态(严重程度: error > critical > warning > healthy
status_priority = {"error": 4, "critical": 3, "warning": 2, "healthy": 1}
if status_priority.get(result["status"], 0) > status_priority.get(overall_status, 0):
overall_status = result["status"]
except Exception as e:
results.append({
"name": check_name,
"status": "error",
"details": {"error": f"检查函数执行失败: {str(e)}"}
})
overall_status = "error"
return {
"timestamp": time.time(),
"overall_status": overall_status,
"checks": results
}
def get_system_info(self) -> Dict[str, Any]:
"""获取系统信息"""
try:
return {
"platform": os.name,
"python_version": ".".join(map(str, __import__('sys').version_info[:3])),
"cpu_count": psutil.cpu_count(),
"memory_total": f"{psutil.virtual_memory().total / (1024**3):.2f} GB",
"disk_total": f"{psutil.disk_usage('/').total / (1024**3):.2f} GB"
}
except Exception as e:
return {"error": str(e)}
# 创建全局健康检查器实例
health_checker = HealthChecker()