Kamixitong/scripts/health_check.py
2025-12-12 11:35:14 +08:00

223 lines
7.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
简单监控脚本
用于快速检查系统健康状态,不依赖外部工具
"""
import os
import sys
import time
import json
import requests
import psutil
from datetime import datetime
from pathlib import Path
class HealthChecker:
"""健康检查器"""
def __init__(self, base_url='http://localhost:5000'):
self.base_url = base_url
self.checks = []
def check_service_health(self):
"""检查服务健康状态"""
print("\n🔍 检查服务健康状态...")
try:
response = requests.get(f'{self.base_url}/api/v1/health', timeout=5)
if response.status_code == 200:
data = response.json()
if data.get('success'):
print("✅ 服务健康检查通过")
return True
print("❌ 服务健康检查失败")
return False
except Exception as e:
print(f"❌ 无法连接到服务: {str(e)}")
return False
def check_system_resources(self):
"""检查系统资源"""
print("\n🔍 检查系统资源...")
# CPU 使用率
cpu_percent = psutil.cpu_percent(interval=1)
print(f" CPU 使用率: {cpu_percent:.1f}%")
cpu_status = "" if cpu_percent < 80 else "⚠️" if cpu_percent < 90 else ""
# 内存使用率
memory = psutil.virtual_memory()
print(f" 内存使用率: {memory.percent:.1f}%")
memory_status = "" if memory.percent < 85 else "⚠️" if memory.percent < 95 else ""
# 磁盘使用率
disk = psutil.disk_usage('/')
disk_percent = (disk.used / disk.total) * 100
print(f" 磁盘使用率: {disk_percent:.1f}%")
disk_status = "" if disk_percent < 90 else "⚠️" if disk_percent < 95 else ""
# 负载均值
load_avg = psutil.getloadavg()
print(f" 系统负载: {load_avg[0]:.2f}, {load_avg[1]:.2f}, {load_avg[2]:.2f}")
load_status = "" if load_avg[0] < 2 else "⚠️" if load_avg[0] < 4 else ""
return all([
cpu_status == "",
memory_status == "",
disk_status == "",
load_status == ""
])
def check_database_connection(self):
"""检查数据库连接"""
print("\n🔍 检查数据库连接...")
try:
# 这里可以添加数据库连接检查
# 由于需要导入Flask应用暂时跳过
print("✅ 数据库连接检查(跳过,需要应用上下文)")
return True
except Exception as e:
print(f"❌ 数据库连接检查失败: {str(e)}")
return False
def check_disk_space(self):
"""检查磁盘空间"""
print("\n🔍 检查磁盘空间...")
disk_usage = psutil.disk_usage('/')
free_gb = disk_usage.free / (1024**3)
total_gb = disk_usage.total / (1024**3)
print(f" 可用空间: {free_gb:.2f} GB / {total_gb:.2f} GB")
return free_gb > 1 # 至少需要1GB可用空间
def check_process_status(self):
"""检查关键进程"""
print("\n🔍 检查关键进程...")
processes = ['python', 'flask', 'nginx', 'mysql', 'redis']
found_processes = []
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
if any(proc.info['name'] and proc.info['name'].lower().startswith(p.lower()) for p in processes):
found_processes.append(proc.info['name'])
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
print(f" 发现进程: {', '.join(set(found_processes))}")
return len(found_processes) > 0
def check_log_files(self):
"""检查日志文件"""
print("\n🔍 检查日志文件...")
log_files = [
'logs/kamaxitong.log',
'/var/log/nginx/access.log',
'/var/log/nginx/error.log'
]
for log_file in log_files:
path = Path(log_file)
if path.exists():
size_mb = path.stat().st_size / (1024 * 1024)
print(f" {log_file}: {size_mb:.2f} MB")
else:
print(f" {log_file}: 文件不存在")
return True
def generate_report(self):
"""生成健康报告"""
print("\n" + "=" * 60)
print("📊 KaMiXiTong 系统健康报告")
print("=" * 60)
print(f"检查时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# 运行所有检查
checks = [
('服务健康', self.check_service_health),
('系统资源', self.check_system_resources),
('数据库连接', self.check_database_connection),
('磁盘空间', self.check_disk_space),
('进程状态', self.check_process_status),
('日志文件', self.check_log_files)
]
results = []
for name, check_func in checks:
try:
result = check_func()
results.append((name, result))
except Exception as e:
print(f"{name}检查失败: {str(e)}")
results.append((name, False))
# 生成报告
print("\n" + "=" * 60)
print("📋 检查结果汇总")
print("=" * 60)
all_passed = True
for name, passed in results:
status = "✅ 通过" if passed else "❌ 失败"
print(f"{name:20s}: {status}")
if not passed:
all_passed = False
print("\n" + "=" * 60)
if all_passed:
print("✅ 系统健康状态良好")
else:
print("⚠️ 发现问题,请查看上述详细信息")
print("=" * 60)
return all_passed
def save_report(self, filename='health_report.json'):
"""保存健康报告到文件"""
report = {
'timestamp': datetime.now().isoformat(),
'checks': [
{
'name': name,
'passed': passed,
'time': datetime.now().isoformat()
}
for name, passed in self.results
],
'system_info': {
'cpu_count': psutil.cpu_count(),
'memory_total': psutil.virtual_memory().total,
'disk_total': psutil.disk_usage('/').total
}
}
with open(filename, 'w') as f:
json.dump(report, f, indent=2)
print(f"\n📄 健康报告已保存到: {filename}")
def main():
"""主函数"""
print("=" * 60)
print("🔍 KaMiXiTong 系统健康检查")
print("=" * 60)
# 获取基础URL
base_url = input("请输入服务地址 (默认: http://localhost:5000): ").strip()
if not base_url:
base_url = 'http://localhost:5000'
checker = HealthChecker(base_url)
passed = checker.generate_report()
# 保存报告
save = input("\n是否保存健康报告? (y/N): ").strip().lower()
if save == 'y':
checker.save_report()
sys.exit(0 if passed else 1)
if __name__ == '__main__':
main()