filesend/backend/app/routes/admin.py

1630 lines
52 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, request, jsonify, send_from_directory, send_file
from flask_jwt_extended import jwt_required, get_jwt, get_jwt_identity
from flask import request, Blueprint, current_app, jsonify
from ..models import User, File, Download, SystemSettings, Category, OperationLog, CategoryPermission, TeamMember, db
import os
from datetime import datetime, timedelta
import hashlib
admin_bp = Blueprint('admin', __name__, url_prefix='/api/admin')
from functools import wraps
from flask_jwt_extended import jwt_required, get_jwt
def admin_required(fn):
@wraps(fn)
@jwt_required()
def wrapper(*args, **kwargs):
claims = get_jwt()
if not claims.get('is_admin'):
return jsonify({'message': '需要管理员权限'}), 403
return fn(*args, **kwargs)
return wrapper
def team_leader_required(fn):
"""团队长权限检查装饰器"""
@wraps(fn)
@jwt_required()
def wrapper(*args, **kwargs):
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user:
return jsonify({'message': '用户不存在'}), 404
# 检查是否是团队长
if not user.is_team_leader:
return jsonify({'message': '需要团队长权限'}), 403
return fn(*args, **kwargs)
return wrapper
# 分类管理
@admin_bp.route('/categories', methods=['GET'])
@admin_required
def get_categories():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
search_query = request.args.get('search', type=str)
hierarchical = request.args.get('hierarchical', 'true').lower() == 'true'
query = Category.query
if search_query:
query = query.filter(Category.name.like(f'%{search_query}%'))
# 如果需要分页,并且不是层次化显示
if not hierarchical:
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
categories = pagination.items
return jsonify({
'categories': [category.to_dict() for category in categories],
'total_categories': pagination.total,
'pages': pagination.pages,
'current_page': pagination.page,
'has_next': pagination.has_next,
'has_prev': pagination.has_prev
}), 200
else:
# 获取所有分类
categories = query.all()
# 构建分类树结构
category_dict = {}
root_categories = []
# 首先将所有分类添加到字典中
for category in categories:
category_dict[category.id] = {
'id': category.id,
'name': category.name,
'description': category.description,
'parent_id': category.parent_id,
'level': category.level,
'path': category.path,
'children': []
}
# 然后构建树结构
for category_id, category_data in category_dict.items():
if category_data['parent_id'] is None:
root_categories.append(category_data)
else:
parent_id = category_data['parent_id']
if parent_id in category_dict:
parent = category_dict[parent_id]
parent['children'].append(category_data)
return jsonify({
'categories': root_categories
}), 200
@admin_bp.route('/categories', methods=['POST'])
@admin_required
def create_category():
data = request.get_json()
name = data.get('name')
description = data.get('description')
parent_id = data.get('parent_id')
if not name:
return jsonify({'message': '分类名称不能为空'}), 400
# 检查同一父级下是否有同名分类
if parent_id:
existing_category = Category.query.filter_by(name=name, parent_id=parent_id).first()
if existing_category:
return jsonify({'message': '该父级下已存在同名分类'}), 400
else:
# 检查顶级分类是否有同名
existing_category = Category.query.filter_by(name=name, parent_id=None).first()
if existing_category:
return jsonify({'message': '顶级分类中已存在同名分类'}), 400
parent_category = None
if parent_id:
parent_category = Category.query.get(parent_id)
if not parent_category:
return jsonify({'message': '父分类不存在'}), 404
# 计算层级和路径
level = 1
path = f'/{name}'
if parent_category:
level = parent_category.level + 1
path = f'{parent_category.path}/{name}'
category = Category(
name=name,
description=description,
parent_id=parent_id,
level=level,
path=path
)
db.session.add(category)
db.session.commit()
# 记录操作日志
admin_user_id = get_jwt_identity()
log = OperationLog(
user_id=admin_user_id,
operation_type='create_category',
details=f'创建分类: {category.name} (ID: {category.id})'
)
db.session.add(log)
db.session.commit()
return jsonify(category.to_dict()), 201
@admin_bp.route('/categories/<int:category_id>', methods=['PUT'])
@admin_required
def update_category(category_id):
try:
category = Category.query.get_or_404(category_id)
data = request.get_json()
name = data.get('name')
description = data.get('description')
parent_id = data.get('parent_id')
if name and name != category.name:
# 检查同一父级下是否有同名分类
if category.parent_id:
existing_category = Category.query.filter_by(name=name, parent_id=category.parent_id).first()
if existing_category:
return jsonify({'message': '该父级下已存在同名分类'}), 400
else:
# 检查顶级分类是否有同名
existing_category = Category.query.filter_by(name=name, parent_id=None).first()
if existing_category:
return jsonify({'message': '顶级分类中已存在同名分类'}), 400
category.name = name
# If name changes, path needs to be re-calculated later after parent_id is handled
if description is not None:
category.description = description
# 处理父分类变更
if parent_id is not None and parent_id != category.parent_id:
if parent_id == category.id:
return jsonify({'message': '不能设置自身为父分类'}), 400
new_parent_category = None
if parent_id != 0: # 0 表示设置为顶级分类
new_parent_category = Category.query.get(parent_id)
if not new_parent_category:
return jsonify({'message': '新的父分类不存在'}), 404
# 检查是否形成循环引用
current = new_parent_category
while current:
if current.id == category.id:
return jsonify({'message': '不能设置子分类为父分类'}), 400
current = current.parent
category.parent_id = parent_id if parent_id != 0 else None
# 更新层级
if new_parent_category:
category.level = new_parent_category.level + 1
else:
category.level = 1
# 重新计算path无论name或parent_id是否改变
if category.parent_id:
parent_category = Category.query.get(category.parent_id)
if parent_category:
category.path = f'{parent_category.path}/{category.name}'
else:
# This case should not happen if parent_id is set but parent_category is not found
# For safety, set to root path
category.path = f'/{category.name}'
else:
category.path = f'/{category.name}'
db.session.commit()
# 记录操作日志
admin_user_id = get_jwt_identity()
log = OperationLog(
user_id=admin_user_id,
operation_type='update_category',
details=f'更新分类 {category.name} (ID: {category.id}) 的信息: {data}'
)
db.session.add(log)
db.session.commit()
return jsonify(category.to_dict()), 200
except Exception as e:
db.session.rollback()
import sys
print(f"Error updating category {category_id}: {e}", file=sys.stderr)
return jsonify({'message': f'更新分类失败: {str(e)}'}), 500
@admin_bp.route('/categories/<int:category_id>', methods=['DELETE'])
@admin_required
def delete_category(category_id):
try:
category = Category.query.get_or_404(category_id)
admin_user_id = get_jwt_identity()
# 获取默认分类(未分类)
default_category = Category.get_default_category()
# 不允许删除默认分类
if category.id == default_category.id:
return jsonify({'message': '不能删除默认分类'}), 400
# 处理子分类
child_categories = Category.query.filter_by(parent_id=category_id).all()
for child in child_categories:
# 递归删除子分类
# 先获取子分类下的所有文件
child_files = File.query.filter_by(category_id=child.id).all()
for file in child_files:
file.category_id = default_category.id
# 记录文件迁移日志
log = OperationLog(
user_id=admin_user_id,
operation_type='move_file',
file_id=file.id,
category_id=default_category.id,
details=f'文件从分类 {child.name} 迁移至 {default_category.name}'
)
db.session.add(log)
# 删除子分类的权限
CategoryPermission.query.filter_by(category_id=child.id).delete()
# 删除子分类相关的操作日志
OperationLog.query.filter_by(category_id=child.id).delete()
# 记录子分类删除日志
log = OperationLog(
user_id=admin_user_id,
operation_type='delete_category',
details=f'删除子分类: {child.name} (ID: {child.id})'
)
db.session.add(log)
# 删除子分类
db.session.delete(child)
# 处理当前分类下的文件
files = File.query.filter_by(category_id=category_id).all()
for file in files:
file.category_id = default_category.id
# 记录文件迁移日志
log = OperationLog(
user_id=admin_user_id,
operation_type='move_file',
file_id=file.id,
category_id=default_category.id,
details=f'文件从分类 {category.name} 迁移至 {default_category.name}'
)
db.session.add(log)
# 删除分类的权限
CategoryPermission.query.filter_by(category_id=category_id).delete()
# 删除相关的操作日志
OperationLog.query.filter_by(category_id=category_id).delete()
# 删除分类
db.session.delete(category)
# 记录操作日志
log = OperationLog(
user_id=admin_user_id,
operation_type='delete_category',
details=f'删除分类: {category.name} (ID: {category.id})'
)
db.session.add(log)
db.session.commit()
return jsonify({'message': '分类删除成功,相关文件已迁移至未分类'}), 200
except Exception as e:
db.session.rollback()
import sys
print(f"Error deleting category {category_id}: {e}", file=sys.stderr)
return jsonify({'message': f'删除分类失败: {str(e)}'}), 500
# 分类权限管理API
@admin_bp.route('/category-permissions', methods=['GET'])
@admin_required
def get_category_permissions():
user_id = request.args.get('user_id', type=int)
category_id = request.args.get('category_id', type=int)
query = CategoryPermission.query
if user_id:
query = query.filter_by(user_id=user_id)
if category_id:
query = query.filter_by(category_id=category_id)
permissions = query.all()
return jsonify({
'permissions': [permission.to_dict() for permission in permissions]
}), 200
@admin_bp.route('/category-permissions', methods=['POST'])
@admin_required
def create_category_permission():
data = request.get_json()
user_id = data.get('user_id')
category_id = data.get('category_id')
inherit_to_children = data.get('inherit_to_children', True)
if not user_id or not category_id:
return jsonify({'message': '用户ID和分类ID不能为空'}), 400
# 检查用户和分类是否存在
user = User.query.get(user_id)
category = Category.query.get(category_id)
if not user:
return jsonify({'message': '用户不存在'}), 404
if not category:
return jsonify({'message': '分类不存在'}), 404
# 检查权限是否已存在
existing_permission = CategoryPermission.query.filter_by(
user_id=user_id, category_id=category_id
).first()
if existing_permission:
return jsonify({'message': '该用户已有此分类的权限'}), 400
# 创建新权限
permission = CategoryPermission(
user_id=user_id,
category_id=category_id,
inherit_to_children=inherit_to_children
)
db.session.add(permission)
# 记录操作日志
admin_user_id = get_jwt_identity()
log = OperationLog(
user_id=admin_user_id,
operation_type='create_permission',
category_id=category_id,
details=f'为用户 {user.username} 添加分类 {category.name} 的权限'
)
db.session.add(log)
db.session.commit()
return jsonify(permission.to_dict()), 201
@admin_bp.route('/category-permissions/<int:permission_id>', methods=['PUT'])
@admin_required
def update_category_permission(permission_id):
permission = CategoryPermission.query.get_or_404(permission_id)
data = request.get_json()
inherit_to_children = data.get('inherit_to_children')
if inherit_to_children is not None:
permission.inherit_to_children = inherit_to_children
db.session.add(permission)
# 记录操作日志
admin_user_id = get_jwt_identity()
log = OperationLog(
user_id=admin_user_id,
operation_type='update_permission',
category_id=permission.category_id,
details=f'更新用户 {permission.user.username} 对分类 {permission.category.name} 的权限'
)
db.session.add(log)
db.session.commit()
return jsonify(permission.to_dict()), 200
@admin_bp.route('/category-permissions/<int:permission_id>', methods=['DELETE'])
@admin_required
def delete_category_permission(permission_id):
permission = CategoryPermission.query.get_or_404(permission_id)
# 记录操作日志
admin_user_id = get_jwt_identity()
log = OperationLog(
user_id=admin_user_id,
operation_type='delete_permission',
category_id=permission.category_id,
details=f'删除用户 {permission.user.username} 对分类 {permission.category.name} 的权限'
)
db.session.delete(permission)
db.session.add(log)
db.session.commit()
return jsonify({'message': '权限删除成功'}), 200
@admin_bp.route('/users/<int:user_id>/category-permissions/batch', methods=['POST'])
@admin_required
def batch_assign_category_permissions(user_id):
user = User.query.get_or_404(user_id)
data = request.get_json()
category_ids = data.get('category_ids', [])
inherit_to_children = data.get('inherit_to_children', True)
if not category_ids:
return jsonify({'message': '分类ID列表不能为空'}), 400
admin_user_id = get_jwt_identity()
created_permissions = []
for category_id in category_ids:
category = Category.query.get(category_id)
if not category:
continue
# 检查权限是否已存在
existing_permission = CategoryPermission.query.filter_by(
user_id=user_id, category_id=category_id
).first()
if not existing_permission:
# 创建新权限
permission = CategoryPermission(
user_id=user_id,
category_id=category_id,
inherit_to_children=inherit_to_children
)
db.session.add(permission)
created_permissions.append(permission)
# 记录操作日志
log = OperationLog(
user_id=admin_user_id,
operation_type='create_permission',
category_id=category_id,
details=f'为用户 {user.username} 添加分类 {category.name} 的权限'
)
db.session.add(log)
db.session.commit()
return jsonify({
'message': f'成功为用户 {user.username} 批量分配 {len(created_permissions)} 个分类权限',
'permissions': [permission.to_dict() for permission in created_permissions]
}), 201
# 分类模板系统API
@admin_bp.route('/category-templates', methods=['GET'])
@admin_required
def get_category_templates():
# 预置模板列表
templates = [
{
'id': 'media',
'name': '媒体资源',
'description': '包含视频、音频、图片等媒体资源的分类结构',
'structure': [
{
'name': '媒体资源',
'children': [
{'name': '视频'},
{'name': '音频'},
{'name': '图片'}
]
}
]
},
{
'id': 'documents',
'name': '文档资料',
'description': '包含PDF、Word、Excel等文档资料的分类结构',
'structure': [
{
'name': '文档资料',
'children': [
{'name': 'PDF文档'},
{'name': 'Word文档'},
{'name': 'Excel表格'},
{'name': '演示文稿'}
]
}
]
}
]
return jsonify({'templates': templates}), 200
@admin_bp.route('/category-templates/<template_id>/apply', methods=['POST'])
@admin_required
def apply_category_template(template_id):
admin_user_id = get_jwt_identity()
# 预置模板定义
templates = {
'media': {
'name': '媒体资源',
'structure': [
{
'name': '媒体资源',
'children': [
{'name': '视频'},
{'name': '音频'},
{'name': '图片'}
]
}
]
},
'documents': {
'name': '文档资料',
'structure': [
{
'name': '文档资料',
'children': [
{'name': 'PDF文档'},
{'name': 'Word文档'},
{'name': 'Excel表格'},
{'name': '演示文稿'}
]
}
]
}
}
if template_id not in templates:
return jsonify({'message': '模板不存在'}), 404
template = templates[template_id]
created_categories = []
# 递归创建分类结构
def create_category_structure(structure, parent_id=None):
for item in structure:
# 检查分类是否已存在
existing = Category.query.filter_by(name=item['name'], parent_id=parent_id).first()
if existing:
parent_category = existing
else:
# 创建父分类
parent_category = Category(name=item['name'], parent_id=parent_id)
db.session.add(parent_category)
db.session.flush() # 获取ID
created_categories.append(parent_category)
# 记录操作日志
log = OperationLog(
user_id=admin_user_id,
operation_type='create_category',
category_id=parent_category.id,
details=f'从模板 {template["name"]} 创建分类: {parent_category.name}'
)
db.session.add(log)
# 如果有子分类,递归创建
if 'children' in item:
create_category_structure(item['children'], parent_category.id)
try:
create_category_structure(template['structure'])
db.session.commit()
return jsonify({
'message': f'成功应用模板 {template["name"]},创建了 {len(created_categories)} 个分类',
'categories': [category.to_dict() for category in created_categories]
}), 201
except Exception as e:
db.session.rollback()
return jsonify({'message': f'应用模板失败: {str(e)}'}), 500
# 用户管理
@admin_bp.route('/users', methods=['GET'])
@admin_required
def get_users():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
# 限制每页数量只能是10, 20, 50
if per_page not in [10, 20, 50]:
per_page = 10
query = User.query
# 搜索功能
search_query = request.args.get('search', type=str)
if search_query:
query = query.filter(
(User.username.like(f'%{search_query}%')) |
(User.email.like(f'%{search_query}%'))
)
# 排序功能
sort_by = request.args.get('sort_by', 'created_at')
sort_order = request.args.get('sort_order', 'desc')
if sort_by == 'created_at':
if sort_order == 'desc':
query = query.order_by(User.created_at.desc())
else:
query = query.order_by(User.created_at.asc())
elif sort_by == 'username':
if sort_order == 'desc':
query = query.order_by(User.username.desc())
else:
query = query.order_by(User.username.asc())
# 可以添加更多排序选项
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
users = pagination.items
has_more = pagination.has_next
return jsonify({
'users': [user.to_dict() for user in users],
'has_more': has_more,
'total_users': pagination.total,
'pages': pagination.pages,
'current_page': pagination.page
}), 200
@admin_bp.route('/users', methods=['POST'])
@admin_required
def create_user():
data = request.get_json()
if not all(k in data for k in ('username', 'email', 'password')):
return jsonify({'message': '缺少必要字段'}), 400
if User.query.filter_by(username=data['username']).first():
return jsonify({'message': '用户名已存在'}), 400
if User.query.filter_by(email=data['email']).first():
return jsonify({'message': '邮箱已被注册'}), 400
user = User(
username=data['username'],
email=data['email'],
is_admin=data.get('is_admin', False),
is_active=data.get('is_active', True),
daily_quota=data.get('daily_quota', 5)
)
user.set_password(data['password'])
db.session.add(user)
db.session.commit()
# 记录操作日志
user_id = get_jwt_identity()
log = OperationLog(
user_id=user_id,
operation_type='update_settings',
details=f'更新系统设置: {data}'
)
db.session.add(log)
db.session.commit()
# 记录操作日志
admin_user_id = get_jwt_identity()
log = OperationLog(
user_id=admin_user_id,
operation_type='create_user',
details=f'创建用户: {user.username} (ID: {user.id})'
)
db.session.add(log)
db.session.commit()
return jsonify(user.to_dict()), 201
# 操作日志管理
@admin_bp.route('/operation_logs', methods=['GET'])
@admin_required
def get_operation_logs():
page = request.args.get('page', 1, type=int)
per_page = 20 # 每页日志数量
query = OperationLog.query.order_by(OperationLog.timestamp.desc())
# 搜索功能
search_query = request.args.get('search', type=str)
if search_query:
query = query.filter(
(OperationLog.operation_type.like(f'%{search_query}%')) |
(OperationLog.details.like(f'%{search_query}%'))
)
# 过滤功能
user_id = request.args.get('user_id', type=int)
if user_id:
query = query.filter_by(user_id=user_id)
# 时间范围过滤
start_date_str = request.args.get('start_date', type=str)
end_date_str = request.args.get('end_date', type=str)
if start_date_str:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d')
query = query.filter(OperationLog.timestamp >= start_date)
if end_date_str:
end_date = datetime.strptime(end_date_str, '%Y-%m-%d') + timedelta(days=1) - timedelta(microseconds=1)
query = query.filter(OperationLog.timestamp <= end_date)
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
logs = pagination.items
has_more = pagination.has_next
return jsonify({
'logs': [log.to_dict() for log in logs],
'has_more': has_more,
'total_logs': pagination.total,
'pages': pagination.pages,
'current_page': pagination.page
}), 200
@admin_bp.route('/users/<int:user_id>', methods=['PUT'])
@admin_required
def update_user(user_id):
user = User.query.get_or_404(user_id)
data = request.get_json()
if 'username' in data and data['username'] != user.username:
if User.query.filter_by(username=data['username']).first():
return jsonify({'message': '用户名已存在'}), 400
user.username = data['username']
if 'email' in data and data['email'] != user.email:
if User.query.filter_by(email=data['email']).first():
return jsonify({'message': '邮箱已被注册'}), 400
user.email = data['email']
if 'is_admin' in data:
user.is_admin = data['is_admin']
if 'is_active' in data:
user.is_active = data['is_active']
if 'daily_quota' in data:
user.daily_quota = data['daily_quota']
if 'password' in data:
user.set_password(data['password'])
db.session.commit()
# 记录操作日志
admin_user_id = get_jwt_identity()
log = OperationLog(
user_id=admin_user_id,
operation_type='update_user',
details=f'更新用户 {user.username} (ID: {user.id}) 的信息: {data}'
)
db.session.add(log)
db.session.commit()
return jsonify(user.to_dict()), 200
@admin_bp.route('/users/<int:user_id>', methods=['DELETE'])
@admin_required
def delete_user(user_id):
user = User.query.get_or_404(user_id)
# 防止删除自己
current_user_id = get_jwt_identity()
if user.id == current_user_id:
return jsonify({'message': '不能删除自己的账号'}), 400
# 删除用户上传的文件
for file_to_delete in user.uploaded_files:
# 删除实际文件
if os.path.exists(file_to_delete.file_path):
try:
os.remove(file_to_delete.file_path)
# 记录操作日志
log = OperationLog(
user_id=current_user_id,
operation_type='delete',
file_id=file_to_delete.id,
details=f'删除用户 {user.username} 的文件: {file_to_delete.original_filename}'
)
db.session.add(log)
except Exception as e:
current_app.logger.error(f"删除文件失败: {file_to_delete.file_path}, 错误: {str(e)}")
# 删除相关的下载记录
if file_to_delete.download:
db.session.delete(file_to_delete.download)
# 删除文件记录
db.session.delete(file_to_delete)
# 删除用户的下载记录
Download.query.filter_by(user_id=user.id).delete()
# 删除用户领取的文件关联
File.query.filter_by(taken_by=user.id).update({'taken_by': None, 'taken_at': None})
# 记录操作日志(在删除用户之前)
admin_user_id = get_jwt_identity()
log = OperationLog(
user_id=admin_user_id,
operation_type='delete_user',
details=f'删除用户: {user.username} (ID: {user.id})'
)
db.session.add(log)
# 删除用户
db.session.delete(user)
db.session.commit()
return jsonify({'message': '用户已删除'}), 200
# 文件管理
@admin_bp.route('/files', methods=['GET'])
@admin_required
def get_all_files():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
# 限制每页数量只能是10, 20, 50
if per_page not in [10, 20, 50]:
per_page = 20
query = File.query
# 搜索功能
search_query = request.args.get('search', type=str)
if search_query:
query = query.filter(
(File.original_filename.like(f'%{search_query}%')) |
(File.description.like(f'%{search_query}%'))
)
# 过滤功能
category_id = request.args.get('category_id', type=int)
if category_id:
query = query.filter_by(category_id=category_id)
# 排序功能
sort_by = request.args.get('sort_by', 'created_at')
sort_order = request.args.get('sort_order', 'desc')
if sort_by == 'created_at':
if sort_order == 'desc':
query = query.order_by(File.created_at.desc())
else:
query = query.order_by(File.created_at.asc())
elif sort_by == 'file_size':
if sort_order == 'desc':
query = query.order_by(File.file_size.desc())
else:
query = query.order_by(File.file_size.asc())
# 可以添加更多排序选项
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
files = pagination.items
has_more = pagination.has_next
return jsonify({
'files': [file.to_dict(include_path=True) for file in files],
'has_more': has_more,
'total_files': pagination.total,
'pages': pagination.pages,
'current_page': pagination.page
}), 200
@admin_bp.route('/files', methods=['POST'])
@admin_required
def upload_file():
if 'file' not in request.files:
return jsonify({'message': '未选择文件'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'message': '未选择文件'}), 400
# 获取系统设置
max_file_size = SystemSettings.get_value('max_file_size', 10485760) # 默认10MB
max_files_per_user = SystemSettings.get_value('max_files_per_user', 100)
# 检查文件大小
file.seek(0, 2) # 移动到文件末尾
file_size = file.tell()
file.seek(0) # 重置文件指针
if file_size > max_file_size:
return jsonify({'message': f'文件大小超过限制(最大{max_file_size/1024/1024:.1f}MB'}), 400
# 检查用户上传文件数量
user_id = get_jwt_identity()
user_file_count = File.query.filter_by(uploaded_by=user_id).count()
if user_file_count >= max_files_per_user:
return jsonify({'message': f'已达到最大上传文件数量限制({max_files_per_user}个)'}), 400
if file:
# 计算文件哈希
hasher = hashlib.sha256()
for chunk in iter(lambda: file.read(4096), b''):
hasher.update(chunk)
file_hash = hasher.hexdigest()
file.seek(0) # 重置文件指针
# 检查文件是否已存在(通过哈希)
existing_file = File.query.filter_by(file_hash=file_hash).first()
if existing_file:
return jsonify({'message': '文件已存在', 'file': existing_file.to_dict()}), 409
# 获取分类ID
category_id = request.form.get('category_id')
if not category_id:
return jsonify({'message': '缺少分类ID'}), 400
category = Category.query.get(category_id)
if not category:
return jsonify({'message': '分类不存在'}), 404
# 生成唯一文件名
from uuid import uuid4
filename = f"{uuid4().hex}_{file.filename}"
file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], filename)
# 确保上传目录存在
os.makedirs(current_app.config['UPLOAD_FOLDER'], exist_ok=True)
# 保存文件
file.save(file_path)
# 创建文件记录
new_file = File(
filename=filename,
original_filename=file.filename,
file_path=file_path,
description=request.form.get('description'),
file_size=file_size,
file_hash=file_hash,
category_id=category_id,
uploaded_by=user_id
)
db.session.add(new_file)
db.session.commit()
# 记录操作日志
log = OperationLog(
user_id=user_id,
operation_type='upload',
file_id=new_file.id,
details=f'上传文件: {new_file.original_filename}'
)
db.session.add(log)
db.session.commit()
return jsonify(new_file.to_dict()), 201
@admin_bp.route('/files/<int:file_id>/download', methods=['GET'])
@admin_required
def download_file(file_id):
file_record = File.query.get_or_404(file_id)
file_path = file_record.file_path
user_id = get_jwt_identity()
if not os.path.exists(file_path):
return jsonify({'message': '文件不存在'}), 404
try:
# 记录操作日志
log = OperationLog(
user_id=user_id,
operation_type='download',
file_id=file_record.id,
details=f'下载文件: {file_record.original_filename}'
)
db.session.add(log)
db.session.commit()
return send_file(file_path, as_attachment=True, download_name=file_record.original_filename)
except Exception as e:
current_app.logger.error(f"文件下载失败: {file_path}, 错误: {str(e)}")
return jsonify({'message': '文件下载失败'}), 500
@admin_bp.route('/files/<int:file_id>', methods=['PUT'])
@admin_required
def update_file(file_id):
file_record = File.query.get_or_404(file_id)
data = request.get_json()
if 'description' in data:
file_record.description = data['description']
if 'category_id' in data:
category_id = data['category_id']
if category_id:
category = Category.query.get(category_id)
if not category:
return jsonify({'message': '分类不存在'}), 404
file_record.category_id = category_id
else:
file_record.category_id = None
db.session.commit()
# 记录操作日志
user_id = get_jwt_identity()
log = OperationLog(
user_id=user_id,
operation_type='update',
file_id=file_record.id,
details=f'更新文件 {file_record.original_filename} 的信息: {data}'
)
db.session.add(log)
db.session.commit()
return jsonify(file_record.to_dict()), 200
@admin_bp.route('/files/<int:file_id>', methods=['DELETE'])
@admin_required
def delete_file(file_id):
file_record = File.query.get_or_404(file_id)
# 记录文件信息用于日志
original_filename = file_record.original_filename
file_path = file_record.file_path
# 尝试删除物理文件
file_deleted = False
if os.path.exists(file_path):
try:
os.remove(file_path)
file_deleted = True
except Exception as e:
current_app.logger.error(f"删除文件失败: {file_path}, 错误: {str(e)}")
# 即使文件删除失败,也继续删除数据库记录,但记录警告
current_app.logger.warning(f"文件不存在或无法删除,继续删除数据库记录: {file_path}")
else:
current_app.logger.warning(f"文件不存在: {file_path}")
try:
# 记录操作日志(在删除文件记录之前)
user_id = get_jwt_identity()
log = OperationLog(
user_id=user_id,
operation_type='delete',
file_id=file_record.id,
details=f'删除文件: {original_filename} (物理文件: {"已删除" if file_deleted else "不存在或删除失败"})'
)
db.session.add(log)
# 先删除相关的下载记录
if file_record.download:
db.session.delete(file_record.download)
# 删除数据库记录
db.session.delete(file_record)
db.session.commit()
return jsonify({
'message': '文件已删除',
'details': {
'filename': original_filename,
'file_deleted': file_deleted
}
}), 200
except Exception as e:
db.session.rollback()
current_app.logger.error(f"删除数据库记录失败: {str(e)}")
return jsonify({'message': '删除文件记录失败'}), 500
# 分类管理
# 系统设置
@admin_bp.route('/settings', methods=['GET'])
@admin_required
def get_settings():
settings = SystemSettings.query.all()
settings_dict = [s.to_dict() for s in settings]
# 将max_file_size从字节转换为MB返回给前端
for setting in settings_dict:
if setting['key'] == 'max_file_size' and isinstance(setting['value'], (int, float)):
setting['value'] = round(setting['value'] / (1024 * 1024))
return jsonify(settings_dict), 200
@admin_bp.route('/settings', methods=['PUT'])
@admin_required
def update_settings():
data = request.get_json()
for key, value in data.items():
setting = SystemSettings.query.filter_by(key=key).first()
if setting:
setting.value = value
else:
new_setting = SystemSettings(key=key, value=value)
db.session.add(new_setting)
# 如果更新了每日配额,同步更新所有用户的配额
if 'daily_quota' in data:
try:
new_quota = int(data['daily_quota'])
# 更新所有用户的每日配额
User.query.update({'daily_quota': new_quota})
current_app.logger.info(f"已更新所有用户的每日配额为: {new_quota}")
except ValueError:
current_app.logger.error(f"每日配额值无效: {data['daily_quota']}")
return jsonify({'message': '每日配额值无效'}), 400
db.session.commit()
# 记录操作日志
user_id = get_jwt_identity()
log = OperationLog(
user_id=user_id,
operation_type='update_settings',
details=f'更新系统设置: {data}'
)
db.session.add(log)
db.session.commit()
return jsonify({'message': '设置已更新'}), 200
# 统计分析
@admin_bp.route('/analytics/users', methods=['GET'])
@admin_required
def get_user_analytics():
total_users = User.query.count()
active_users = User.query.filter_by(is_active=True).count()
admin_users = User.query.filter_by(is_admin=True).count()
return jsonify({
'total_users': total_users,
'active_users': active_users,
'admin_users': admin_users
}), 200
@admin_bp.route('/analytics/downloads', methods=['GET'])
@admin_required
def get_download_analytics():
total_downloads = Download.query.count()
today = datetime.utcnow().date()
today_downloads = Download.query.filter(db.func.date(Download.download_time) == today).count()
return jsonify({
'total_downloads': total_downloads,
'today_downloads': today_downloads
}), 200
@admin_bp.route('/analytics/categories', methods=['GET'])
@admin_required
def get_category_analytics():
categories = Category.query.all()
category_data = []
for category in categories:
file_count = File.query.filter_by(category_id=category.id).count()
category_data.append({
'id': category.id,
'name': category.name,
'file_count': file_count
})
return jsonify(category_data), 200
@admin_bp.route('/analytics/system_health', methods=['GET'])
@admin_required
def get_system_health():
# 示例:检查数据库连接
try:
db.session.execute(db.text('SELECT 1'))
db_status = '正常'
except Exception as e:
db_status = f'异常: {str(e)}'
# 示例:检查上传目录是否存在和可写
upload_folder_path = current_app.config['UPLOAD_FOLDER']
upload_folder_status = '正常'
if not os.path.exists(upload_folder_path):
upload_folder_status = '不存在'
elif not os.access(upload_folder_path, os.W_OK):
upload_folder_status = '不可写'
return jsonify({
'database_status': db_status,
'upload_folder_status': upload_folder_status,
'timestamp': datetime.utcnow().isoformat()
}), 200
@admin_bp.route('/dashboard_summary', methods=['GET'])
@admin_required
def get_dashboard_summary():
total_users = User.query.count()
total_files = File.query.count()
total_downloads = Download.query.count()
total_file_size = db.session.query(db.func.sum(File.file_size)).scalar() or 0
return jsonify({
'total_users': total_users,
'total_files': total_files,
'total_downloads': total_downloads,
'total_file_size': total_file_size
}), 200
file = File.query.get_or_404(file_id)
# 删除文件记录前先删除实际文件
if os.path.exists(file.file_path):
try:
os.remove(file.file_path)
except Exception as e:
return jsonify({'message': f'删除文件失败: {str(e)}'}), 500
# 删除相关的下载记录
if file.download:
db.session.delete(file.download)
db.session.delete(file)
db.session.commit()
return jsonify({'message': '文件已删除'}), 200
# 数据分析
@admin_bp.route('/analytics/files', methods=['GET'])
@admin_required
def file_analytics():
# 总文件数
total_files = File.query.count()
# 已领取和未领取文件数
taken_files = File.query.filter_by(is_taken=True).count()
available_files = total_files - taken_files
# 最近7天上传的文件数
seven_days_ago = datetime.utcnow() - timedelta(days=7)
recent_uploads = File.query.filter(File.created_at >= seven_days_ago).count()
# 按日期统计上传数量
upload_stats = db.session.query(
db.func.date(File.created_at),
db.func.count(File.id)
).group_by(db.func.date(File.created_at)).all()
# 按日期统计领取数量
take_stats = db.session.query(
db.func.date(File.taken_at),
db.func.count(File.id)
).filter(File.taken_at.isnot(None)).group_by(db.func.date(File.taken_at)).all()
return jsonify({
'total_files': total_files,
'taken_files': taken_files,
'available_files': available_files,
'recent_uploads': recent_uploads,
'upload_stats': {date.strftime('%Y-%m-%d'): count for date, count in upload_stats},
'take_stats': {date.strftime('%Y-%m-%d'): count for date, count in take_stats}
}), 200
@admin_bp.route('/analytics/users', methods=['GET'])
@admin_required
def user_analytics():
# 总用户数
total_users = User.query.count()
# 活跃和非活跃用户数
active_users = User.query.filter_by(is_active=True).count()
inactive_users = total_users - active_users
# 管理员数量
admin_users = User.query.filter_by(is_admin=True).count()
# 用户领取统计
user_download_stats = db.session.query(
User.username,
db.func.count(Download.id)
).join(Download).group_by(User.username).all()
return jsonify({
'total_users': total_users,
'active_users': active_users,
'inactive_users': inactive_users,
'admin_users': admin_users,
'user_download_stats': {username: count for username, count in user_download_stats}
}), 200
@admin_bp.route('/categories', methods=['POST'])
def update_file_category(file_id):
file = File.query.get_or_404(file_id)
data = request.get_json()
if 'category_id' not in data:
return jsonify({'message': '缺少分类ID'}), 400
if data['category_id'] is not None:
category = Category.query.get(data['category_id'])
if not category:
return jsonify({'message': '分类不存在'}), 404
file.category_id = data['category_id']
db.session.commit()
return jsonify(file.to_dict()), 200
# 分类统计
@admin_bp.route('/analytics/categories', methods=['GET'])
@admin_required
def category_analytics():
# 获取所有分类及其文件数量
categories = db.session.query(
Category.id,
Category.name,
Category.path,
db.func.count(File.id)
).outerjoin(File).group_by(Category.id).all()
# 获取每个分类的下载次数
downloads = db.session.query(
File.category_id,
db.func.count(Download.id)
).join(Download).group_by(File.category_id).all()
download_stats = {category_id: count for category_id, count in downloads}
# 构建结果
result = []
for category_id, name, path, file_count in categories:
result.append({
'id': category_id,
'name': name,
'path': path,
'file_count': file_count,
'download_count': download_stats.get(category_id, 0)
})
return jsonify(result), 200
# 团队长管理相关API
@admin_bp.route('/team-leaders', methods=['GET'])
@admin_required
def get_team_leaders():
"""获取所有团队长列表"""
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
search_query = request.args.get('search', type=str)
query = User.query.filter_by(is_team_leader=True)
if search_query:
query = query.filter(User.username.like(f'%{search_query}%'))
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
team_leaders = pagination.items
result = []
for leader in team_leaders:
# 获取团队成员数量
member_count = TeamMember.query.filter_by(leader_id=leader.id).count()
leader_data = leader.to_dict()
leader_data['member_count'] = member_count
result.append(leader_data)
return jsonify({
'team_leaders': result,
'total': pagination.total,
'pages': pagination.pages,
'current_page': pagination.page,
'has_next': pagination.has_next,
'has_prev': pagination.has_prev
}), 200
@admin_bp.route('/users/<int:user_id>/team-leader', methods=['PUT'])
@admin_required
def set_team_leader(user_id):
"""设置用户为团队长或取消团队长权限"""
data = request.get_json()
is_team_leader = data.get('is_team_leader', False)
user = User.query.get_or_404(user_id)
# 如果取消团队长权限,需要先移除所有团队成员
if not is_team_leader and user.is_team_leader:
# 移除该团队长的所有成员
TeamMember.query.filter_by(leader_id=user_id).delete()
db.session.commit()
user.is_team_leader = is_team_leader
db.session.commit()
# 记录操作日志
admin_user_id = get_jwt_identity()
action = '设置为团队长' if is_team_leader else '取消团队长权限'
log = OperationLog(
user_id=admin_user_id,
operation_type='set_team_leader',
details=f'{action}: {user.username} (ID: {user.id})'
)
db.session.add(log)
db.session.commit()
return jsonify({
'message': f'用户 {user.username} 已成功{action}',
'user': user.to_dict()
}), 200
@admin_bp.route('/team-members', methods=['GET'])
@admin_required
def get_all_team_members():
"""获取所有团队成员关系"""
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
leader_id = request.args.get('leader_id', type=int)
query = TeamMember.query
if leader_id:
query = query.filter_by(leader_id=leader_id)
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
members = pagination.items
result = []
for member in members:
member_data = member.to_dict()
result.append(member_data)
return jsonify({
'team_members': result,
'total': pagination.total,
'pages': pagination.pages,
'current_page': pagination.page,
'has_next': pagination.has_next,
'has_prev': pagination.has_prev
}), 200
@admin_bp.route('/team-members', methods=['POST'])
@admin_required
def add_team_member():
"""添加团队成员"""
data = request.get_json()
leader_id = data.get('leader_id')
member_id = data.get('member_id')
if not leader_id or not member_id:
return jsonify({'message': '团队长ID和成员ID不能为空'}), 400
if leader_id == member_id:
return jsonify({'message': '团队长不能添加自己为成员'}), 400
# 检查团队长是否存在且有权限
leader = User.query.get(leader_id)
if not leader or not leader.is_team_leader:
return jsonify({'message': '指定的用户不是团队长'}), 400
# 检查成员是否存在
member = User.query.get(member_id)
if not member:
return jsonify({'message': '指定的成员用户不存在'}), 404
# 检查成员是否已有团队长
existing_member = TeamMember.query.filter_by(member_id=member_id).first()
if existing_member:
return jsonify({'message': '该用户已有团队长,请先移除现有团队关系'}), 400
# 创建团队关系
team_member = TeamMember(leader_id=leader_id, member_id=member_id)
db.session.add(team_member)
db.session.commit()
# 记录操作日志
admin_user_id = get_jwt_identity()
log = OperationLog(
user_id=admin_user_id,
operation_type='add_team_member',
details=f'添加团队成员: {member.username} (ID: {member.id}) 到团队长 {leader.username} (ID: {leader.id})'
)
db.session.add(log)
db.session.commit()
return jsonify({
'message': '团队成员添加成功',
'team_member': team_member.to_dict()
}), 201
@admin_bp.route('/team-members/<int:member_id>', methods=['DELETE'])
@admin_required
def remove_team_member(member_id):
"""移除团队成员"""
team_member = TeamMember.query.filter_by(member_id=member_id).first()
if not team_member:
return jsonify({'message': '未找到该团队成员关系'}), 404
member_name = team_member.member.username
leader_name = team_member.team_leader.username
db.session.delete(team_member)
db.session.commit()
# 记录操作日志
admin_user_id = get_jwt_identity()
log = OperationLog(
user_id=admin_user_id,
operation_type='remove_team_member',
details=f'移除团队成员: {member_name} (ID: {member_id}) 从团队长 {leader_name} (ID: {team_member.leader_id})'
)
db.session.add(log)
db.session.commit()
return jsonify({'message': '团队成员移除成功'}), 200
@admin_bp.route('/users/without-team', methods=['GET'])
@admin_required
def get_users_without_team():
"""获取没有团队的用户列表(用于添加团队成员)"""
# 获取已有团队的用户ID
existing_member_ids = db.session.query(TeamMember.member_id).all()
existing_ids = [id[0] for id in existing_member_ids]
# 查询没有团队的用户(排除管理员和已有团队的用户)
query = User.query.filter(
User.is_admin == False,
~User.id.in_(existing_ids) if existing_ids else True
)
users = query.all()
return jsonify({
'users': [{
'id': user.id,
'username': user.username,
'email': user.email,
'is_active': user.is_active
} for user in users]
}), 200