#!/usr/bin/env python3 """ 简单监控脚本 用于快速检查系统健康状态,不依赖外部工具 """ import os import sys import time import json import requests import psutil from datetime import datetime from pathlib import Path class HealthChecker: """健康检查器""" def __init__(self, base_url='http://localhost:5000'): self.base_url = base_url self.checks = [] def check_service_health(self): """检查服务健康状态""" print("\n🔍 检查服务健康状态...") try: response = requests.get(f'{self.base_url}/api/v1/health', timeout=5) if response.status_code == 200: data = response.json() if data.get('success'): print("✅ 服务健康检查通过") return True print("❌ 服务健康检查失败") return False except Exception as e: print(f"❌ 无法连接到服务: {str(e)}") return False def check_system_resources(self): """检查系统资源""" print("\n🔍 检查系统资源...") # CPU 使用率 cpu_percent = psutil.cpu_percent(interval=1) print(f" CPU 使用率: {cpu_percent:.1f}%") cpu_status = "✅" if cpu_percent < 80 else "⚠️" if cpu_percent < 90 else "❌" # 内存使用率 memory = psutil.virtual_memory() print(f" 内存使用率: {memory.percent:.1f}%") memory_status = "✅" if memory.percent < 85 else "⚠️" if memory.percent < 95 else "❌" # 磁盘使用率 disk = psutil.disk_usage('/') disk_percent = (disk.used / disk.total) * 100 print(f" 磁盘使用率: {disk_percent:.1f}%") disk_status = "✅" if disk_percent < 90 else "⚠️" if disk_percent < 95 else "❌" # 负载均值 load_avg = psutil.getloadavg() print(f" 系统负载: {load_avg[0]:.2f}, {load_avg[1]:.2f}, {load_avg[2]:.2f}") load_status = "✅" if load_avg[0] < 2 else "⚠️" if load_avg[0] < 4 else "❌" return all([ cpu_status == "✅", memory_status == "✅", disk_status == "✅", load_status == "✅" ]) def check_database_connection(self): """检查数据库连接""" print("\n🔍 检查数据库连接...") try: # 这里可以添加数据库连接检查 # 由于需要导入Flask应用,暂时跳过 print("✅ 数据库连接检查(跳过,需要应用上下文)") return True except Exception as e: print(f"❌ 数据库连接检查失败: {str(e)}") return False def check_disk_space(self): """检查磁盘空间""" print("\n🔍 检查磁盘空间...") disk_usage = psutil.disk_usage('/') free_gb = disk_usage.free / (1024**3) total_gb = disk_usage.total / (1024**3) print(f" 可用空间: {free_gb:.2f} GB / {total_gb:.2f} GB") return free_gb > 1 # 至少需要1GB可用空间 def check_process_status(self): """检查关键进程""" print("\n🔍 检查关键进程...") processes = ['python', 'flask', 'nginx', 'mysql', 'redis'] found_processes = [] for proc in psutil.process_iter(['pid', 'name', 'cmdline']): try: if any(proc.info['name'] and proc.info['name'].lower().startswith(p.lower()) for p in processes): found_processes.append(proc.info['name']) except (psutil.NoSuchProcess, psutil.AccessDenied): pass print(f" 发现进程: {', '.join(set(found_processes))}") return len(found_processes) > 0 def check_log_files(self): """检查日志文件""" print("\n🔍 检查日志文件...") log_files = [ 'logs/kamaxitong.log', '/var/log/nginx/access.log', '/var/log/nginx/error.log' ] for log_file in log_files: path = Path(log_file) if path.exists(): size_mb = path.stat().st_size / (1024 * 1024) print(f" {log_file}: {size_mb:.2f} MB") else: print(f" {log_file}: 文件不存在") return True def generate_report(self): """生成健康报告""" print("\n" + "=" * 60) print("📊 KaMiXiTong 系统健康报告") print("=" * 60) print(f"检查时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") # 运行所有检查 checks = [ ('服务健康', self.check_service_health), ('系统资源', self.check_system_resources), ('数据库连接', self.check_database_connection), ('磁盘空间', self.check_disk_space), ('进程状态', self.check_process_status), ('日志文件', self.check_log_files) ] results = [] for name, check_func in checks: try: result = check_func() results.append((name, result)) except Exception as e: print(f"❌ {name}检查失败: {str(e)}") results.append((name, False)) # 生成报告 print("\n" + "=" * 60) print("📋 检查结果汇总") print("=" * 60) all_passed = True for name, passed in results: status = "✅ 通过" if passed else "❌ 失败" print(f"{name:20s}: {status}") if not passed: all_passed = False print("\n" + "=" * 60) if all_passed: print("✅ 系统健康状态良好") else: print("⚠️ 发现问题,请查看上述详细信息") print("=" * 60) return all_passed def save_report(self, filename='health_report.json'): """保存健康报告到文件""" report = { 'timestamp': datetime.now().isoformat(), 'checks': [ { 'name': name, 'passed': passed, 'time': datetime.now().isoformat() } for name, passed in self.results ], 'system_info': { 'cpu_count': psutil.cpu_count(), 'memory_total': psutil.virtual_memory().total, 'disk_total': psutil.disk_usage('/').total } } with open(filename, 'w') as f: json.dump(report, f, indent=2) print(f"\n📄 健康报告已保存到: {filename}") def main(): """主函数""" print("=" * 60) print("🔍 KaMiXiTong 系统健康检查") print("=" * 60) # 获取基础URL base_url = input("请输入服务地址 (默认: http://localhost:5000): ").strip() if not base_url: base_url = 'http://localhost:5000' checker = HealthChecker(base_url) passed = checker.generate_report() # 保存报告 save = input("\n是否保存健康报告? (y/N): ").strip().lower() if save == 'y': checker.save_report() sys.exit(0 if passed else 1) if __name__ == '__main__': main()