from datetime import datetime from app import db import json class AuditLog(db.Model): """审计日志模型""" __tablename__ = 'audit_log' log_id = db.Column(db.Integer, primary_key=True) admin_id = db.Column(db.Integer, db.ForeignKey('admin.admin_id'), nullable=False) action = db.Column(db.String(32), nullable=False) # 操作类型:CREATE, UPDATE, DELETE, LOGIN, LOGOUT, TOGGLE_STATUS target_type = db.Column(db.String(32), nullable=False) # 目标类型:ADMIN, PRODUCT, LICENSE等 target_id = db.Column(db.String(32), nullable=True) # 目标ID(修改为字符串类型以支持不同类型的ID) details = db.Column(db.Text, nullable=True) # 操作详情(JSON格式) ip_address = db.Column(db.String(32), nullable=True) # 操作IP地址 user_agent = db.Column(db.String(256), nullable=True) # 用户代理 create_time = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) # 关联管理员 admin = db.relationship('Admin', backref=db.backref('audit_logs', lazy='dynamic')) def __repr__(self): return f'' def to_dict(self): """转换为字典""" # 解析details字段中的JSON字符串 details_data = None if self.details: try: details_data = json.loads(self.details) except (json.JSONDecodeError, TypeError): details_data = self.details # 如果解析失败,返回原始字符串 return { 'log_id': self.log_id, 'admin_id': self.admin_id, 'admin_username': self.admin.username if self.admin else None, 'action': self.action, 'target_type': self.target_type, 'target_id': self.target_id, 'details': details_data, 'ip_address': self.ip_address, 'user_agent': self.user_agent, 'create_time': self.create_time.strftime('%Y-%m-%d %H:%M:%S') if self.create_time else None } @staticmethod def log_action(admin_id, action, target_type, target_id=None, details=None, ip_address=None, user_agent=None): """记录审计日志""" from flask import current_app try: # 将details字典序列化为JSON字符串 details_str = None if details is not None: if isinstance(details, dict): details_str = json.dumps(details, ensure_ascii=False) else: details_str = str(details) log = AuditLog( admin_id=admin_id if admin_id is not None else 0, # 确保admin_id不为None action=action, target_type=target_type, target_id=target_id, details=details_str, ip_address=ip_address, user_agent=user_agent ) db.session.add(log) db.session.commit() return True except Exception as e: db.session.rollback() if hasattr(current_app, 'logger'): current_app.logger.error(f"记录审计日志失败: {str(e)}") return False