262 lines
9.2 KiB
Python
262 lines
9.2 KiB
Python
"""
|
||
系统健康检查模块
|
||
提供系统状态监控和健康检查功能
|
||
"""
|
||
|
||
import os
|
||
import psutil
|
||
import time
|
||
from typing import Dict, Any, List
|
||
from config import settings
|
||
from openai import OpenAI
|
||
|
||
class HealthChecker:
|
||
"""系统健康检查器"""
|
||
|
||
def __init__(self):
|
||
self.checks = {
|
||
"disk_space": self._check_disk_space,
|
||
"memory_usage": self._check_memory_usage,
|
||
"cpu_usage": self._check_cpu_usage,
|
||
"openai_api": self._check_openai_api,
|
||
"redis_connection": self._check_redis_connection,
|
||
"file_permissions": self._check_file_permissions
|
||
}
|
||
|
||
def _check_disk_space(self) -> Dict[str, Any]:
|
||
"""检查磁盘空间"""
|
||
try:
|
||
disk = psutil.disk_usage('/')
|
||
free_percent = (disk.free / disk.total) * 100
|
||
|
||
status = "healthy" if free_percent > 10 else "warning" if free_percent > 5 else "critical"
|
||
|
||
return {
|
||
"name": "磁盘空间",
|
||
"status": status,
|
||
"details": {
|
||
"total": f"{disk.total / (1024**3):.2f} GB",
|
||
"used": f"{disk.used / (1024**3):.2f} GB",
|
||
"free": f"{disk.free / (1024**3):.2f} GB",
|
||
"free_percent": f"{free_percent:.2f}%"
|
||
}
|
||
}
|
||
except Exception as e:
|
||
return {
|
||
"name": "磁盘空间",
|
||
"status": "error",
|
||
"details": {"error": str(e)}
|
||
}
|
||
|
||
def _check_memory_usage(self) -> Dict[str, Any]:
|
||
"""检查内存使用情况"""
|
||
try:
|
||
memory = psutil.virtual_memory()
|
||
usage_percent = memory.percent
|
||
|
||
status = "healthy" if usage_percent < 80 else "warning" if usage_percent < 90 else "critical"
|
||
|
||
return {
|
||
"name": "内存使用",
|
||
"status": status,
|
||
"details": {
|
||
"total": f"{memory.total / (1024**3):.2f} GB",
|
||
"available": f"{memory.available / (1024**3):.2f} GB",
|
||
"used": f"{memory.used / (1024**3):.2f} GB",
|
||
"usage_percent": f"{usage_percent:.2f}%"
|
||
}
|
||
}
|
||
except Exception as e:
|
||
return {
|
||
"name": "内存使用",
|
||
"status": "error",
|
||
"details": {"error": str(e)}
|
||
}
|
||
|
||
def _check_cpu_usage(self) -> Dict[str, Any]:
|
||
"""检查CPU使用情况"""
|
||
try:
|
||
# 获取1秒内的CPU使用率
|
||
cpu_percent = psutil.cpu_percent(interval=1)
|
||
|
||
status = "healthy" if cpu_percent < 80 else "warning" if cpu_percent < 90 else "critical"
|
||
|
||
return {
|
||
"name": "CPU使用",
|
||
"status": status,
|
||
"details": {
|
||
"usage_percent": f"{cpu_percent:.2f}%",
|
||
"core_count": psutil.cpu_count()
|
||
}
|
||
}
|
||
except Exception as e:
|
||
return {
|
||
"name": "CPU使用",
|
||
"status": "error",
|
||
"details": {"error": str(e)}
|
||
}
|
||
|
||
def _check_openai_api(self) -> Dict[str, Any]:
|
||
"""检查OpenAI API连接"""
|
||
try:
|
||
# 根据提供商选择API密钥和基础URL
|
||
api_key = None
|
||
api_base = None
|
||
provider_name = "Unknown"
|
||
|
||
if settings.EMBEDDING_PROVIDER == "openai":
|
||
api_key = settings.OPENAI_API_KEY
|
||
api_base = settings.OPENAI_API_BASE
|
||
provider_name = "OpenAI"
|
||
elif settings.EMBEDDING_PROVIDER == "anthropic":
|
||
api_key = settings.ANTHROPIC_API_KEY
|
||
api_base = settings.ANTHROPIC_API_BASE
|
||
provider_name = "Anthropic"
|
||
elif settings.EMBEDDING_PROVIDER == "qwen":
|
||
api_key = settings.QWEN_API_KEY
|
||
api_base = settings.QWEN_API_BASE
|
||
provider_name = "通义千问"
|
||
|
||
if not api_key:
|
||
return {
|
||
"name": f"{provider_name} API",
|
||
"status": "warning",
|
||
"details": {"message": f"{provider_name} API密钥未配置"}
|
||
}
|
||
|
||
client = OpenAI(api_key=api_key, base_url=api_base)
|
||
|
||
# 尝试获取模型列表来验证连接
|
||
start_time = time.time()
|
||
models = client.models.list()
|
||
response_time = (time.time() - start_time) * 1000 # 转换为毫秒
|
||
|
||
status = "healthy" if response_time < 1000 else "warning" if response_time < 3000 else "critical"
|
||
|
||
return {
|
||
"name": f"{provider_name} API",
|
||
"status": status,
|
||
"details": {
|
||
"response_time_ms": f"{response_time:.2f}",
|
||
"models_available": len(list(models)),
|
||
"provider": settings.EMBEDDING_PROVIDER,
|
||
"api_base": api_base
|
||
}
|
||
}
|
||
except Exception as e:
|
||
return {
|
||
"name": f"{settings.EMBEDDING_PROVIDER.upper()} API",
|
||
"status": "error",
|
||
"details": {"error": str(e)}
|
||
}
|
||
|
||
def _check_redis_connection(self) -> Dict[str, Any]:
|
||
"""检查Redis连接"""
|
||
try:
|
||
import redis
|
||
|
||
# 尝试连接Redis
|
||
start_time = time.time()
|
||
r = redis.Redis.from_url(settings.CELERY_BROKER_URL)
|
||
r.ping()
|
||
response_time = (time.time() - start_time) * 1000 # 转换为毫秒
|
||
|
||
status = "healthy" if response_time < 100 else "warning" if response_time < 500 else "critical"
|
||
|
||
return {
|
||
"name": "Redis连接",
|
||
"status": status,
|
||
"details": {
|
||
"response_time_ms": f"{response_time:.2f}",
|
||
"broker_url": settings.CELERY_BROKER_URL
|
||
}
|
||
}
|
||
except ImportError:
|
||
return {
|
||
"name": "Redis连接",
|
||
"status": "warning",
|
||
"details": {"message": "Redis库未安装"}
|
||
}
|
||
except Exception as e:
|
||
return {
|
||
"name": "Redis连接",
|
||
"status": "error",
|
||
"details": {"error": str(e)}
|
||
}
|
||
|
||
def _check_file_permissions(self) -> Dict[str, Any]:
|
||
"""检查文件权限"""
|
||
try:
|
||
required_paths = [
|
||
".",
|
||
"uploads",
|
||
"exports",
|
||
"logs"
|
||
]
|
||
|
||
issues = []
|
||
for path in required_paths:
|
||
if not os.path.exists(path):
|
||
issues.append(f"路径不存在: {path}")
|
||
elif not os.access(path, os.R_OK | os.W_OK):
|
||
issues.append(f"权限不足: {path}")
|
||
|
||
status = "healthy" if not issues else "warning"
|
||
|
||
return {
|
||
"name": "文件权限",
|
||
"status": status,
|
||
"details": {
|
||
"issues": issues if issues else "所有路径权限正常"
|
||
}
|
||
}
|
||
except Exception as e:
|
||
return {
|
||
"name": "文件权限",
|
||
"status": "error",
|
||
"details": {"error": str(e)}
|
||
}
|
||
|
||
def run_health_check(self) -> Dict[str, Any]:
|
||
"""运行完整的健康检查"""
|
||
results = []
|
||
overall_status = "healthy"
|
||
|
||
for check_name, check_func in self.checks.items():
|
||
try:
|
||
result = check_func()
|
||
results.append(result)
|
||
|
||
# 更新整体状态(严重程度: error > critical > warning > healthy)
|
||
status_priority = {"error": 4, "critical": 3, "warning": 2, "healthy": 1}
|
||
if status_priority.get(result["status"], 0) > status_priority.get(overall_status, 0):
|
||
overall_status = result["status"]
|
||
except Exception as e:
|
||
results.append({
|
||
"name": check_name,
|
||
"status": "error",
|
||
"details": {"error": f"检查函数执行失败: {str(e)}"}
|
||
})
|
||
overall_status = "error"
|
||
|
||
return {
|
||
"timestamp": time.time(),
|
||
"overall_status": overall_status,
|
||
"checks": results
|
||
}
|
||
|
||
def get_system_info(self) -> Dict[str, Any]:
|
||
"""获取系统信息"""
|
||
try:
|
||
return {
|
||
"platform": os.name,
|
||
"python_version": ".".join(map(str, __import__('sys').version_info[:3])),
|
||
"cpu_count": psutil.cpu_count(),
|
||
"memory_total": f"{psutil.virtual_memory().total / (1024**3):.2f} GB",
|
||
"disk_total": f"{psutil.disk_usage('/').total / (1024**3):.2f} GB"
|
||
}
|
||
except Exception as e:
|
||
return {"error": str(e)}
|
||
|
||
# 创建全局健康检查器实例
|
||
health_checker = HealthChecker() |