from flask import request, jsonify, current_app from app import db from app.models import Admin, AuditLog from . import api_bp from flask_login import current_user, login_required from werkzeug.security import generate_password_hash import functools import re from .decorators import require_admin from app.utils.logger import log_operation # 响应码定义 class ResponseCode: SUCCESS = 0 VALIDATION_ERROR = 1001 AUTHENTICATION_FAILED = 1002 PERMISSION_DENIED = 1003 NOT_FOUND = 1004 SERVER_ERROR = 5000 DUPLICATE_USERNAME = 2001 INVALID_DATA = 2002 CANNOT_DELETE_SELF = 2003 CANNOT_DISABLE_SELF = 2004 OPERATION_FAILED = 3001 def create_response(success, data=None, message='', code=ResponseCode.SUCCESS, status_code=200): """统一的响应格式""" return jsonify({ 'success': success, 'data': data, 'message': message, 'code': code }), status_code def handle_exceptions(f): """异常处理装饰器""" @functools.wraps(f) def decorated_function(*args, **kwargs): try: return f(*args, **kwargs) except ValueError as e: current_app.logger.error(f"数据验证错误: {str(e)}") return create_response(False, message=str(e), code=ResponseCode.VALIDATION_ERROR, status_code=400) except Exception as e: db.session.rollback() current_app.logger.error(f"服务器错误: {str(e)}", exc_info=True) return create_response(False, message='服务器内部错误,请稍后重试', code=ResponseCode.SERVER_ERROR, status_code=500) return decorated_function def log_audit(action, target_type, target_id=None, details=None): """记录审计日志""" try: ip_address = request.headers.get('X-Forwarded-For', request.remote_addr) user_agent = request.headers.get('User-Agent', '') AuditLog.log_action( admin_id=current_user.admin_id, action=action, target_type=target_type, target_id=target_id, details=details, ip_address=ip_address, user_agent=user_agent ) except Exception as e: current_app.logger.error(f"记录审计日志失败: {str(e)}") def validate_username(username): """验证用户名""" if not username or not username.strip(): return False, '用户名不能为空' if len(username.strip()) < 3: return False, '用户名至少3个字符' if len(username.strip()) > 32: return False, '用户名不能超过32个字符' if not re.match(r'^[a-zA-Z0-9_]+$', username.strip()): return False, '用户名只能包含字母、数字和下划线' return True, '' def validate_password(password): """验证密码强度""" if not password or not password.strip(): return False, '密码不能为空' if len(password) < 6: return False, '密码长度至少6位' if len(password) > 128: return False, '密码长度不能超过128个字符' return True, '' def validate_email(email): """验证邮箱""" if email and '@' in email: if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email): return False, '邮箱格式不正确' return True, '' def validate_admin_data(data, is_create=True): """验证管理员数据""" if not data: return False, '请求数据为空', ResponseCode.VALIDATION_ERROR # 验证用户名 username = data.get('username', '').strip() is_valid, msg = validate_username(username) if not is_valid: return False, msg, ResponseCode.VALIDATION_ERROR # 检查用户名是否已存在 if is_create: existing = Admin.query.filter(Admin.username == username, Admin.is_deleted == 0).first() if existing: return False, '用户名已存在', ResponseCode.DUPLICATE_USERNAME else: # 编辑时检查其他用户是否使用相同用户名 admin_id = data.get('admin_id') existing = Admin.query.filter( Admin.username == username, Admin.is_deleted == 0, Admin.admin_id != admin_id ).first() if existing: return False, '用户名已存在', ResponseCode.DUPLICATE_USERNAME # 验证邮箱 email = data.get('email', '').strip() is_valid, msg = validate_email(email) if not is_valid: return False, msg, ResponseCode.VALIDATION_ERROR # 验证密码 if is_create: password = data.get('password', '') is_valid, msg = validate_password(password) if not is_valid: return False, msg, ResponseCode.VALIDATION_ERROR else: # 编辑时,如果提供了密码则验证 password = data.get('password', '') if password: is_valid, msg = validate_password(password) if not is_valid: return False, msg, ResponseCode.VALIDATION_ERROR # 验证角色 role = data.get('role') if role is not None and role not in [0, 1]: return False, '角色值无效', ResponseCode.VALIDATION_ERROR # 验证状态 status = data.get('status') if status is not None and status not in [0, 1]: return False, '状态值无效', ResponseCode.VALIDATION_ERROR return True, '', ResponseCode.SUCCESS @api_bp.route('/admins', methods=['GET']) @require_admin @handle_exceptions def get_admins(): """获取管理员列表""" try: page = request.args.get('page', 1, type=int) per_page = min(request.args.get('per_page', 10, type=int), 100) keyword = request.args.get('keyword', '').strip() role = request.args.get('role', type=int) if request.args.get('role') != '' else None status = request.args.get('status', type=int) if request.args.get('status') != '' else None # 使用 get_query() 方法获取未删除的查询 query = Admin.get_query() current_app.logger.debug(f"查询管理员列表 - page: {page}, per_page: {per_page}, keyword: {keyword}, role: {role}, status: {status}") # 关键词搜索 if keyword: query = query.filter(Admin.username.contains(keyword)) # 角色筛选 if role is not None: query = query.filter(Admin.role == role) # 状态筛选 if status is not None: query = query.filter(Admin.status == status) # 分页 pagination = query.order_by(Admin.create_time.desc()).paginate( page=page, per_page=per_page, error_out=False ) current_app.logger.debug(f"查询结果 - total: {pagination.total}, pages: {pagination.pages}, items: {len(pagination.items)}") admins = [admin.to_dict() for admin in pagination.items] return create_response(True, data={ 'admins': admins, 'pagination': { 'page': page, 'per_page': per_page, 'total': pagination.total, 'pages': pagination.pages, 'has_prev': pagination.has_prev, 'has_next': pagination.has_next } }) except Exception as e: current_app.logger.error(f"获取管理员列表失败: {str(e)}", exc_info=True) raise @api_bp.route('/admins', methods=['POST']) @require_admin @handle_exceptions def create_admin(): """创建管理员""" data = request.get_json() if not data: return create_response(False, message='请求数据为空', code=ResponseCode.VALIDATION_ERROR, status_code=400) # 验证数据 is_valid, message, code = validate_admin_data(data, is_create=True) if not is_valid: return create_response(False, message=message, code=code, status_code=400) username = data.get('username', '').strip() email = data.get('email', '').strip() role = data.get('role', 0) status = data.get('status', 1) password = data.get('password', '') try: # 创建管理员 admin = Admin( username=username, email=email, role=role, status=status ) admin.set_password(password) db.session.add(admin) db.session.commit() # 记录审计日志 log_audit('CREATE', 'ADMIN', admin.admin_id, { 'username': username, 'role': role, 'status': status }) # 记录操作日志 log_operation('CREATE_ADMIN', 'ADMIN', admin.admin_id, { 'username': username, 'role': role, 'status': status }) return create_response(True, data=admin.to_dict(), message='管理员创建成功') except Exception as e: db.session.rollback() current_app.logger.error(f"创建管理员失败: {str(e)}", exc_info=True) return create_response(False, message='创建管理员失败', code=ResponseCode.OPERATION_FAILED, status_code=500) @api_bp.route('/admins/', methods=['GET']) @require_admin @handle_exceptions def get_admin(admin_id): """获取管理员详情""" admin = Admin.query.filter(Admin.admin_id == admin_id, Admin.is_deleted == 0).first() if not admin: return create_response(False, message='管理员不存在', code=ResponseCode.NOT_FOUND, status_code=404) return create_response(True, data=admin.to_dict()) @api_bp.route('/admins/', methods=['PUT']) @require_admin @handle_exceptions def update_admin(admin_id): """更新管理员""" admin = Admin.query.filter(Admin.admin_id == admin_id, Admin.is_deleted == 0).first() if not admin: return create_response(False, message='管理员不存在', code=ResponseCode.NOT_FOUND, status_code=404) data = request.get_json() if not data: return create_response(False, message='请求数据为空', code=ResponseCode.VALIDATION_ERROR, status_code=400) # 验证数据 data['admin_id'] = admin_id is_valid, message, code = validate_admin_data(data, is_create=False) if not is_valid: return create_response(False, message=message, code=code, status_code=400) try: # 记录旧值 old_data = { 'email': admin.email, 'role': admin.role, 'status': admin.status } # 更新字段 if 'email' in data: admin.email = data['email'].strip() if 'role' in data: admin.role = data['role'] if 'status' in data: admin.status = data['status'] # 如果提供了密码,则更新密码 password = data.get('password', '') if password and password.strip(): admin.set_password(password) db.session.commit() # 记录审计日志 log_audit('UPDATE', 'ADMIN', admin_id, { 'old': old_data, 'new': { 'email': admin.email, 'role': admin.role, 'status': admin.status } }) # 记录操作日志 log_operation('UPDATE_ADMIN', 'ADMIN', admin_id, { 'old': old_data, 'new': { 'email': admin.email, 'role': admin.role, 'status': admin.status } }) return create_response(True, data=admin.to_dict(), message='管理员更新成功') except Exception as e: db.session.rollback() current_app.logger.error(f"更新管理员失败: {str(e)}", exc_info=True) return create_response(False, message='更新管理员失败', code=ResponseCode.OPERATION_FAILED, status_code=500) @api_bp.route('/admins//toggle-status', methods=['POST']) @require_admin @handle_exceptions def toggle_admin_status(admin_id): """切换管理员状态""" admin = Admin.query.filter(Admin.admin_id == admin_id, Admin.is_deleted == 0).first() if not admin: return create_response(False, message='管理员不存在', code=ResponseCode.NOT_FOUND, status_code=404) # 不允许禁用自己 if current_user.admin_id == admin_id and admin.status == 1: return create_response(False, message='不能禁用当前登录的管理员', code=ResponseCode.CANNOT_DISABLE_SELF, status_code=400) try: old_status = admin.status admin.status = 0 if admin.status == 1 else 1 db.session.commit() status_name = '正常' if admin.status == 1 else '禁用' action = '启用' if admin.status == 1 else '禁用' # 记录审计日志 log_audit('TOGGLE_STATUS', 'ADMIN', admin_id, { 'old_status': old_status, 'new_status': admin.status }) # 记录操作日志 log_operation('TOGGLE_ADMIN_STATUS', 'ADMIN', admin_id, { 'old_status': old_status, 'new_status': admin.status }) return create_response(True, data={ 'status': admin.status, 'status_name': status_name }, message=f'管理员已{action}') except Exception as e: db.session.rollback() current_app.logger.error(f"切换管理员状态失败: {str(e)}", exc_info=True) return create_response(False, message='切换状态失败', code=ResponseCode.OPERATION_FAILED, status_code=500) @api_bp.route('/admins/', methods=['DELETE']) @require_admin @handle_exceptions def delete_admin(admin_id): """删除管理员(软删除)""" admin = Admin.query.filter(Admin.admin_id == admin_id, Admin.is_deleted == 0).first() if not admin: return create_response(False, message='管理员不存在', code=ResponseCode.NOT_FOUND, status_code=404) # 不允许删除自己 if current_user.admin_id == admin_id: return create_response(False, message='不能删除当前登录的管理员', code=ResponseCode.CANNOT_DELETE_SELF, status_code=400) try: # 软删除 admin.soft_delete() db.session.commit() # 记录审计日志 log_audit('DELETE', 'ADMIN', admin_id, { 'username': admin.username }) # 记录操作日志 log_operation('DELETE_ADMIN', 'ADMIN', admin_id, { 'username': admin.username }) return create_response(True, message='管理员删除成功') except Exception as e: db.session.rollback() current_app.logger.error(f"删除管理员失败: {str(e)}", exc_info=True) return create_response(False, message='删除管理员失败', code=ResponseCode.OPERATION_FAILED, status_code=500)