filesend/backend/app/routes/user.py

378 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, jsonify, send_file, request
from flask_jwt_extended import jwt_required, get_jwt_identity
from ..models import File, Download, User, SystemSettings, Category, CategoryPermission, db
from datetime import datetime, date, timedelta
import os
# user_bp = Blueprint('user', __name__)
user_bp = Blueprint('user', __name__, url_prefix='/api') # 添加 url_prefix='/api'
@user_bp.route('/files', methods=['GET'])
@jwt_required()
def get_available_files():
# 获取所有未被领取的文件
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 12, type=int)
# 限制每页数量只能是10, 20, 50
if per_page not in [10, 20, 50]:
per_page = 12
category_id = request.args.get('category_id', type=int)
user_id = get_jwt_identity()
# 获取用户有权限的所有分类ID
authorized_category_ids = set()
# 获取用户直接拥有权限的分类
direct_permissions = db.session.query(CategoryPermission.category_id)\
.filter_by(user_id=user_id).all()
direct_category_ids = [p[0] for p in direct_permissions]
# 添加直接权限的分类ID
authorized_category_ids.update(direct_category_ids)
# 处理继承权限
for cat_id in direct_category_ids:
permission = CategoryPermission.query.filter_by(
user_id=user_id, category_id=cat_id
).first()
if permission and permission.inherit_to_children:
# 获取该分类的所有子分类
category = Category.query.get(cat_id)
if category:
# 递归获取子分类ID
def get_children_ids(parent_id):
children = Category.query.filter_by(parent_id=parent_id).all()
for child in children:
authorized_category_ids.add(child.id)
get_children_ids(child.id)
get_children_ids(cat_id)
# 如果没有任何权限,返回空列表
if not authorized_category_ids:
return jsonify({
'files': [],
'has_more': False
}), 200
# 基于权限过滤文件查询
query = File.query.filter_by(is_taken=False)\
.filter(File.category_id.in_(authorized_category_ids))
# 如果指定了分类ID进一步过滤
if category_id:
# 检查分类是否存在
category = Category.query.get(category_id)
if not category:
return jsonify({'message': '分类不存在'}), 404
# 检查用户是否有权限访问该分类
if category_id not in authorized_category_ids:
return jsonify({'message': '无权访问该分类'}), 403
# 获取该分类及其所有子分类的ID
filtered_category_ids = [category_id]
# 递归获取子分类ID
def get_children_ids(cat_id):
children = Category.query.filter_by(parent_id=cat_id).all()
for child in children:
# 只添加用户有权限的子分类
if child.id in authorized_category_ids:
filtered_category_ids.append(child.id)
get_children_ids(child.id)
get_children_ids(category_id)
query = query.filter(File.category_id.in_(filtered_category_ids))
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
files = pagination.items
has_more = pagination.has_next
return jsonify({
'files': [file.to_dict() for file in files],
'has_more': has_more
}), 200
@user_bp.route('/files/<int:file_id>', methods=['GET'])
@jwt_required()
def get_file_details(file_id):
file = File.query.get_or_404(file_id)
return jsonify(file.to_dict()), 200
@user_bp.route('/files/<int:file_id>/take', methods=['POST'])
@jwt_required()
def take_file(file_id):
user_id = get_jwt_identity()
user = User.query.get_or_404(user_id)
file = File.query.get_or_404(file_id)
# 检查文件是否已被领取
if file.is_taken:
return jsonify({'message': '该文件已被领取'}), 400
# 检查用户今日领取数量是否已达上限
today = date.today()
start_of_day = datetime.combine(today, datetime.min.time())
end_of_day = datetime.combine(today, datetime.max.time())
today_downloads = Download.query.filter(
Download.user_id == user_id,
Download.download_time >= start_of_day,
Download.download_time <= end_of_day
).count()
# 使用系统设置中的每日配额
daily_quota = user.daily_quota
if daily_quota <= 0:
daily_quota = SystemSettings.get_value('daily_quota', 5)
if today_downloads >= daily_quota:
return jsonify({
'message': f'今日领取数量已达上限({daily_quota}个)',
'today_downloads': today_downloads,
'daily_quota': daily_quota
}), 400
# 更新文件状态
file.is_taken = True
file.taken_by = user_id
file.taken_at = datetime.utcnow()
# 创建下载记录
download = Download(
user_id=user_id,
file_id=file_id
)
db.session.add(download)
db.session.commit()
return jsonify({
'message': '文件领取成功',
'file': file.to_dict(),
'download_id': download.id
}), 200
@user_bp.route('/downloads/<int:download_id>', methods=['GET'])
@jwt_required()
def download_file(download_id):
user_id = get_jwt_identity()
download = Download.query.get_or_404(download_id)
# 验证权限
if download.user_id != user_id:
return jsonify({'message': '无权下载此文件'}), 403
file = download.file
# 检查文件是否存在
if not os.path.exists(file.file_path):
return jsonify({'message': '文件不存在'}), 404
# 发送文件
return send_file(
file.file_path,
as_attachment=True,
download_name=file.original_filename,
mimetype='application/octet-stream' # 确保浏览器下载而不是预览
)
@user_bp.route('/files/<int:file_id>/download', methods=['GET'])
@jwt_required()
def download_file_by_id(file_id):
user_id = get_jwt_identity()
# 检查用户是否有权限下载此文件
download = Download.query.filter_by(
user_id=user_id,
file_id=file_id
).first()
if not download:
return jsonify({'message': '您没有下载此文件的权限'}), 403
file = File.query.get_or_404(file_id)
# 检查用户是否有权限访问该文件所属分类
has_permission = False
# 获取用户有权限的所有分类ID
authorized_category_ids = set()
# 获取用户直接拥有权限的分类
direct_permissions = db.session.query(CategoryPermission.category_id)\
.filter_by(user_id=user_id).all()
direct_category_ids = [p[0] for p in direct_permissions]
# 添加直接权限的分类ID
authorized_category_ids.update(direct_category_ids)
# 处理继承权限
for cat_id in direct_category_ids:
permission = CategoryPermission.query.filter_by(
user_id=user_id, category_id=cat_id
).first()
if permission and permission.inherit_to_children:
# 获取该分类的所有子分类
category = Category.query.get(cat_id)
if category:
# 递归获取子分类ID
def get_children_ids(parent_id):
children = Category.query.filter_by(parent_id=parent_id).all()
for child in children:
authorized_category_ids.add(child.id)
get_children_ids(child.id)
get_children_ids(cat_id)
# 检查文件所属分类是否在授权列表中
if file.category_id in authorized_category_ids:
has_permission = True
if not has_permission:
return jsonify({'message': '您没有访问此文件所属分类的权限'}), 403
# 检查文件是否存在
if not os.path.exists(file.file_path):
return jsonify({'message': '文件不存在'}), 404
# 发送文件
return send_file(
file.file_path,
as_attachment=True,
download_name=file.original_filename,
mimetype='application/octet-stream' # 确保浏览器下载而不是预览
)
@user_bp.route('/user/downloads', methods=['GET'])
@jwt_required()
def get_user_downloads():
user_id = get_jwt_identity()
downloads = Download.query.filter_by(user_id=user_id).order_by(Download.download_time.desc()).all()
return jsonify([download.to_dict() for download in downloads]), 200
@user_bp.route('/user/quota', methods=['GET'])
@jwt_required()
def get_user_quota():
user_id = get_jwt_identity()
user = User.query.get_or_404(user_id)
today = date.today()
start_of_day = datetime.combine(today, datetime.min.time())
end_of_day = datetime.combine(today, datetime.max.time())
today_downloads = Download.query.filter(
Download.user_id == user_id,
Download.download_time >= start_of_day,
Download.download_time <= end_of_day
).count()
return jsonify({
'daily_quota': user.daily_quota,
'today_used': today_downloads,
'today_remaining': user.daily_quota - today_downloads
}), 200
@user_bp.route('/user/categories', methods=['GET'])
@jwt_required()
def get_user_categories():
user_id = get_jwt_identity()
# 获取用户有权限的所有分类ID
authorized_category_ids = set()
# 获取用户直接拥有权限的分类
direct_permissions = db.session.query(CategoryPermission.category_id)\
.filter_by(user_id=user_id).all()
direct_category_ids = [p[0] for p in direct_permissions]
# 添加直接权限的分类ID
authorized_category_ids.update(direct_category_ids)
# 处理继承权限
for cat_id in direct_category_ids:
permission = CategoryPermission.query.filter_by(
user_id=user_id, category_id=cat_id
).first()
if permission and permission.inherit_to_children:
# 获取该分类的所有子分类
category = Category.query.get(cat_id)
if category:
# 递归获取子分类ID
def get_children_ids(parent_id):
children = Category.query.filter_by(parent_id=parent_id).all()
for child in children:
authorized_category_ids.add(child.id)
get_children_ids(child.id)
get_children_ids(cat_id)
# 获取所有授权分类的详细信息
categories = Category.query.filter(Category.id.in_(authorized_category_ids)).all()
# 构建分类树结构
category_dict = {}
root_categories = []
# 首先将所有分类添加到字典中
for category in categories:
category_dict[category.id] = {
'id': category.id,
'name': category.name,
'parent_id': category.parent_id,
'level': category.level,
'path': category.path,
'children': []
}
# 然后构建树结构
for category_id, category_data in category_dict.items():
if category_data['parent_id'] is None or category_data['parent_id'] not in category_dict:
root_categories.append(category_data)
else:
parent = category_dict[category_data['parent_id']]
parent['children'].append(category_data)
return jsonify({
'categories': root_categories
}), 200
# 在 user.py 中添加
@user_bp.route('/user/profile', methods=['PUT'])
@jwt_required()
def update_profile():
user_id = get_jwt_identity()
user = User.query.get_or_404(user_id)
data = request.get_json()
if 'email' in data:
# 检查邮箱是否已被其他用户使用
if data['email'] != user.email:
existing_user = User.query.filter_by(email=data['email']).first()
if existing_user:
return jsonify({'message': '邮箱已被注册'}), 400
user.email = data['email']
# 处理密码修改
if 'current_password' in data and 'new_password' in data:
if not user.check_password(data['current_password']):
return jsonify({'message': '当前密码错误'}), 401
if len(data['new_password']) < 6:
return jsonify({'message': '新密码长度至少为6位'}), 400
user.set_password(data['new_password'])
db.session.commit()
return jsonify({'message': '资料更新成功'}), 200