filesend/backend/app/routes/user.py

378 lines
13 KiB
Python
Raw Permalink Normal View History

2025-10-10 17:25:29 +08:00
from flask import Blueprint, jsonify, send_file, request
from flask_jwt_extended import jwt_required, get_jwt_identity
from ..models import File, Download, User, SystemSettings, Category, CategoryPermission, db
from datetime import datetime, date, timedelta
import os
# user_bp = Blueprint('user', __name__)
user_bp = Blueprint('user', __name__, url_prefix='/api') # 添加 url_prefix='/api'
@user_bp.route('/files', methods=['GET'])
@jwt_required()
def get_available_files():
# 获取所有未被领取的文件
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 12, type=int)
# 限制每页数量只能是10, 20, 50
if per_page not in [10, 20, 50]:
per_page = 12
category_id = request.args.get('category_id', type=int)
user_id = get_jwt_identity()
# 获取用户有权限的所有分类ID
authorized_category_ids = set()
# 获取用户直接拥有权限的分类
direct_permissions = db.session.query(CategoryPermission.category_id)\
.filter_by(user_id=user_id).all()
direct_category_ids = [p[0] for p in direct_permissions]
# 添加直接权限的分类ID
authorized_category_ids.update(direct_category_ids)
# 处理继承权限
for cat_id in direct_category_ids:
permission = CategoryPermission.query.filter_by(
user_id=user_id, category_id=cat_id
).first()
if permission and permission.inherit_to_children:
# 获取该分类的所有子分类
category = Category.query.get(cat_id)
if category:
# 递归获取子分类ID
def get_children_ids(parent_id):
children = Category.query.filter_by(parent_id=parent_id).all()
for child in children:
authorized_category_ids.add(child.id)
get_children_ids(child.id)
get_children_ids(cat_id)
# 如果没有任何权限,返回空列表
if not authorized_category_ids:
return jsonify({
'files': [],
'has_more': False
}), 200
# 基于权限过滤文件查询
query = File.query.filter_by(is_taken=False)\
.filter(File.category_id.in_(authorized_category_ids))
# 如果指定了分类ID进一步过滤
if category_id:
# 检查分类是否存在
category = Category.query.get(category_id)
if not category:
return jsonify({'message': '分类不存在'}), 404
# 检查用户是否有权限访问该分类
if category_id not in authorized_category_ids:
return jsonify({'message': '无权访问该分类'}), 403
# 获取该分类及其所有子分类的ID
filtered_category_ids = [category_id]
# 递归获取子分类ID
def get_children_ids(cat_id):
children = Category.query.filter_by(parent_id=cat_id).all()
for child in children:
# 只添加用户有权限的子分类
if child.id in authorized_category_ids:
filtered_category_ids.append(child.id)
get_children_ids(child.id)
get_children_ids(category_id)
query = query.filter(File.category_id.in_(filtered_category_ids))
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
files = pagination.items
has_more = pagination.has_next
return jsonify({
'files': [file.to_dict() for file in files],
'has_more': has_more
}), 200
@user_bp.route('/files/<int:file_id>', methods=['GET'])
@jwt_required()
def get_file_details(file_id):
file = File.query.get_or_404(file_id)
return jsonify(file.to_dict()), 200
@user_bp.route('/files/<int:file_id>/take', methods=['POST'])
@jwt_required()
def take_file(file_id):
user_id = get_jwt_identity()
user = User.query.get_or_404(user_id)
file = File.query.get_or_404(file_id)
# 检查文件是否已被领取
if file.is_taken:
return jsonify({'message': '该文件已被领取'}), 400
# 检查用户今日领取数量是否已达上限
today = date.today()
start_of_day = datetime.combine(today, datetime.min.time())
end_of_day = datetime.combine(today, datetime.max.time())
today_downloads = Download.query.filter(
Download.user_id == user_id,
Download.download_time >= start_of_day,
Download.download_time <= end_of_day
).count()
# 使用系统设置中的每日配额
daily_quota = user.daily_quota
if daily_quota <= 0:
daily_quota = SystemSettings.get_value('daily_quota', 5)
if today_downloads >= daily_quota:
return jsonify({
'message': f'今日领取数量已达上限({daily_quota}个)',
'today_downloads': today_downloads,
'daily_quota': daily_quota
}), 400
# 更新文件状态
file.is_taken = True
file.taken_by = user_id
file.taken_at = datetime.utcnow()
# 创建下载记录
download = Download(
user_id=user_id,
file_id=file_id
)
db.session.add(download)
db.session.commit()
return jsonify({
'message': '文件领取成功',
'file': file.to_dict(),
'download_id': download.id
}), 200
@user_bp.route('/downloads/<int:download_id>', methods=['GET'])
@jwt_required()
def download_file(download_id):
user_id = get_jwt_identity()
download = Download.query.get_or_404(download_id)
# 验证权限
if download.user_id != user_id:
return jsonify({'message': '无权下载此文件'}), 403
file = download.file
# 检查文件是否存在
if not os.path.exists(file.file_path):
return jsonify({'message': '文件不存在'}), 404
# 发送文件
return send_file(
file.file_path,
as_attachment=True,
download_name=file.original_filename,
mimetype='application/octet-stream' # 确保浏览器下载而不是预览
)
@user_bp.route('/files/<int:file_id>/download', methods=['GET'])
@jwt_required()
def download_file_by_id(file_id):
user_id = get_jwt_identity()
# 检查用户是否有权限下载此文件
download = Download.query.filter_by(
user_id=user_id,
file_id=file_id
).first()
if not download:
return jsonify({'message': '您没有下载此文件的权限'}), 403
file = File.query.get_or_404(file_id)
# 检查用户是否有权限访问该文件所属分类
has_permission = False
# 获取用户有权限的所有分类ID
authorized_category_ids = set()
# 获取用户直接拥有权限的分类
direct_permissions = db.session.query(CategoryPermission.category_id)\
.filter_by(user_id=user_id).all()
direct_category_ids = [p[0] for p in direct_permissions]
# 添加直接权限的分类ID
authorized_category_ids.update(direct_category_ids)
# 处理继承权限
for cat_id in direct_category_ids:
permission = CategoryPermission.query.filter_by(
user_id=user_id, category_id=cat_id
).first()
if permission and permission.inherit_to_children:
# 获取该分类的所有子分类
category = Category.query.get(cat_id)
if category:
# 递归获取子分类ID
def get_children_ids(parent_id):
children = Category.query.filter_by(parent_id=parent_id).all()
for child in children:
authorized_category_ids.add(child.id)
get_children_ids(child.id)
get_children_ids(cat_id)
# 检查文件所属分类是否在授权列表中
if file.category_id in authorized_category_ids:
has_permission = True
if not has_permission:
return jsonify({'message': '您没有访问此文件所属分类的权限'}), 403
# 检查文件是否存在
if not os.path.exists(file.file_path):
return jsonify({'message': '文件不存在'}), 404
# 发送文件
return send_file(
file.file_path,
as_attachment=True,
download_name=file.original_filename,
mimetype='application/octet-stream' # 确保浏览器下载而不是预览
)
@user_bp.route('/user/downloads', methods=['GET'])
@jwt_required()
def get_user_downloads():
user_id = get_jwt_identity()
downloads = Download.query.filter_by(user_id=user_id).order_by(Download.download_time.desc()).all()
return jsonify([download.to_dict() for download in downloads]), 200
@user_bp.route('/user/quota', methods=['GET'])
@jwt_required()
def get_user_quota():
user_id = get_jwt_identity()
user = User.query.get_or_404(user_id)
today = date.today()
start_of_day = datetime.combine(today, datetime.min.time())
end_of_day = datetime.combine(today, datetime.max.time())
today_downloads = Download.query.filter(
Download.user_id == user_id,
Download.download_time >= start_of_day,
Download.download_time <= end_of_day
).count()
return jsonify({
'daily_quota': user.daily_quota,
'today_used': today_downloads,
'today_remaining': user.daily_quota - today_downloads
}), 200
@user_bp.route('/user/categories', methods=['GET'])
@jwt_required()
def get_user_categories():
user_id = get_jwt_identity()
# 获取用户有权限的所有分类ID
authorized_category_ids = set()
# 获取用户直接拥有权限的分类
direct_permissions = db.session.query(CategoryPermission.category_id)\
.filter_by(user_id=user_id).all()
direct_category_ids = [p[0] for p in direct_permissions]
# 添加直接权限的分类ID
authorized_category_ids.update(direct_category_ids)
# 处理继承权限
for cat_id in direct_category_ids:
permission = CategoryPermission.query.filter_by(
user_id=user_id, category_id=cat_id
).first()
if permission and permission.inherit_to_children:
# 获取该分类的所有子分类
category = Category.query.get(cat_id)
if category:
# 递归获取子分类ID
def get_children_ids(parent_id):
children = Category.query.filter_by(parent_id=parent_id).all()
for child in children:
authorized_category_ids.add(child.id)
get_children_ids(child.id)
get_children_ids(cat_id)
# 获取所有授权分类的详细信息
categories = Category.query.filter(Category.id.in_(authorized_category_ids)).all()
# 构建分类树结构
category_dict = {}
root_categories = []
# 首先将所有分类添加到字典中
for category in categories:
category_dict[category.id] = {
'id': category.id,
'name': category.name,
'parent_id': category.parent_id,
'level': category.level,
'path': category.path,
'children': []
}
# 然后构建树结构
for category_id, category_data in category_dict.items():
if category_data['parent_id'] is None or category_data['parent_id'] not in category_dict:
root_categories.append(category_data)
else:
parent = category_dict[category_data['parent_id']]
parent['children'].append(category_data)
return jsonify({
'categories': root_categories
}), 200
# 在 user.py 中添加
@user_bp.route('/user/profile', methods=['PUT'])
@jwt_required()
def update_profile():
user_id = get_jwt_identity()
user = User.query.get_or_404(user_id)
data = request.get_json()
if 'email' in data:
# 检查邮箱是否已被其他用户使用
if data['email'] != user.email:
existing_user = User.query.filter_by(email=data['email']).first()
if existing_user:
return jsonify({'message': '邮箱已被注册'}), 400
user.email = data['email']
# 处理密码修改
if 'current_password' in data and 'new_password' in data:
if not user.check_password(data['current_password']):
return jsonify({'message': '当前密码错误'}), 401
if len(data['new_password']) < 6:
return jsonify({'message': '新密码长度至少为6位'}), 400
user.set_password(data['new_password'])
db.session.commit()
return jsonify({'message': '资料更新成功'}), 200