filesend/backend/app/models.py

285 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime
from . import db
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.Text, nullable=False)
is_admin = db.Column(db.Boolean, default=False)
is_team_leader = db.Column(db.Boolean, default=False) # 是否为团队长
is_active = db.Column(db.Boolean, default=False) # 需要管理员激活
daily_quota = db.Column(db.Integer, default=5)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=db.func.current_timestamp())
# 关系
uploaded_files = db.relationship('File', backref='uploader', lazy=True, foreign_keys='File.uploaded_by')
taken_files = db.relationship('File', backref='taker', lazy=True, foreign_keys='File.taken_by')
downloads = db.relationship('Download', backref='user', lazy=True)
# 团队成员关系 - 作为团队长
team_members = db.relationship('TeamMember', backref='team_leader', lazy=True, foreign_keys='TeamMember.leader_id')
# 团队成员关系 - 作为团员
team_leader_rel = db.relationship('TeamMember', backref='member', lazy=True, foreign_keys='TeamMember.member_id', uselist=False)
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
def to_dict(self):
return {
'id': self.id,
'username': self.username,
'email': self.email,
'is_admin': self.is_admin,
'is_team_leader': self.is_team_leader,
'is_active': self.is_active,
'daily_quota': self.daily_quota,
'created_at': self.created_at.isoformat()
}
class File(db.Model):
__tablename__ = 'files'
id = db.Column(db.Integer, primary_key=True)
filename = db.Column(db.String(255), nullable=False) # 存储的文件名
original_filename = db.Column(db.String(255), nullable=False) # 原始文件名
file_path = db.Column(db.String(512), nullable=False)
description = db.Column(db.Text, nullable=True)
file_size = db.Column(db.Integer) # 字节数
file_hash = db.Column(db.String(64), unique=True, nullable=False) # 文件哈希,用于完整性校验和去重
is_taken = db.Column(db.Boolean, default=False)
category_id = db.Column(db.Integer, db.ForeignKey('categories.id'), nullable=False) # 新增分类字段,文件必须属于一个分类
uploaded_by = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
taken_by = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=True)
taken_at = db.Column(db.DateTime)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 关系
download = db.relationship('Download', backref='file', lazy=True, uselist=False)
category = db.relationship('Category', backref='files') # 新增分类关系
def to_dict(self, include_path=False):
data = {
'id': self.id,
'original_filename': self.original_filename,
'description': self.description,
'file_size': self.file_size,
'file_hash': self.file_hash,
'is_taken': self.is_taken,
'category_id': self.category_id,
'category_name': self.category.name if self.category else None,
'uploaded_by': self.uploader.username if self.uploader else None,
'taken_by': self.taker.username if self.taker else None,
'taken_at': self.taken_at.isoformat() if self.taken_at else None,
'created_at': self.created_at.isoformat()
}
if include_path:
data['file_path'] = self.file_path
return data
class Download(db.Model):
__tablename__ = 'downloads'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
file_id = db.Column(db.Integer, db.ForeignKey('files.id'), nullable=False)
download_time = db.Column(db.DateTime, default=datetime.utcnow)
def to_dict(self):
return {
'id': self.id,
'user_id': self.user_id,
'username': self.user.username,
'file_id': self.file_id,
'original_filename': self.file.original_filename,
'file_size': self.file.file_size,
'download_time': self.download_time.isoformat()
}
class SystemSettings(db.Model):
__tablename__ = 'system_settings'
id = db.Column(db.Integer, primary_key=True)
key = db.Column(db.String(100), unique=True, nullable=False)
value = db.Column(db.Text, nullable=False)
description = db.Column(db.String(255), nullable=True)
data_type = db.Column(db.String(20), default='string') # string, int, float, bool
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def to_dict(self):
return {
'id': self.id,
'key': self.key,
'value': self.value,
'description': self.description,
'data_type': self.data_type,
'updated_at': self.updated_at.isoformat()
}
@classmethod
def get_value(cls, key, default=None):
"""获取配置值"""
setting = cls.query.filter_by(key=key).first()
if not setting:
return default
if setting.data_type == 'int':
return int(setting.value)
elif setting.data_type == 'float':
return float(setting.value)
elif setting.data_type == 'bool':
return setting.value.lower() == 'true'
else:
return setting.value
@classmethod
def set_value(cls, key, value, description=None, data_type='string'):
"""设置配置值"""
setting = cls.query.filter_by(key=key).first()
if not setting:
setting = cls(key=key)
setting.value = str(value)
if description:
setting.description = description
if data_type:
setting.data_type = data_type
db.session.add(setting)
db.session.commit()
return setting
class Category(db.Model):
__tablename__ = 'categories'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False) # 移除unique=True约束允许不同父级下有同名分类
description = db.Column(db.Text, nullable=True)
parent_id = db.Column(db.Integer, db.ForeignKey('categories.id'), nullable=True) # 父分类ID
level = db.Column(db.Integer, default=1, nullable=False) # 分类层级
path = db.Column(db.String(255), nullable=False, unique=True) # 分类路径,如 /root/category1/subcategory1
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 关系
children = db.relationship('Category', backref=db.backref('parent', remote_side=[id]), lazy=True)
permissions = db.relationship('CategoryPermission', backref='category', lazy=True, cascade='all, delete-orphan')
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'description': self.description,
'parent_id': self.parent_id,
'level': self.level,
'path': self.path,
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat()
}
@classmethod
def get_default_category(cls):
"""获取默认的'未分类'分类"""
default_category = cls.query.filter_by(name='未分类').first()
if not default_category:
default_category = cls(name='未分类', description='默认分类', path='/未分类')
db.session.add(default_category)
db.session.commit()
return default_category
class CategoryPermission(db.Model):
__tablename__ = 'category_permissions'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
category_id = db.Column(db.Integer, db.ForeignKey('categories.id'), nullable=False)
inherit_to_children = db.Column(db.Boolean, default=True) # 是否继承权限给子分类
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 关系
user = db.relationship('User', backref='category_permissions')
# 联合唯一约束,确保一个用户对一个分类只有一条权限记录
__table_args__ = (db.UniqueConstraint('user_id', 'category_id', name='uq_user_category'),)
def to_dict(self):
return {
'id': self.id,
'user_id': self.user_id,
'username': self.user.username if self.user else None,
'category_id': self.category_id,
'category_name': self.category.name if self.category else None,
'inherit_to_children': self.inherit_to_children,
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat()
}
class OperationLog(db.Model):
__tablename__ = 'operation_logs'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
operation_type = db.Column(db.String(50), nullable=False) # e.g., 'upload', 'download', 'delete', 'update'
file_id = db.Column(db.Integer, db.ForeignKey('files.id'), nullable=True) # Optional, for file-related operations
category_id = db.Column(db.Integer, db.ForeignKey('categories.id'), nullable=True) # Optional, for category-related operations
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
details = db.Column(db.Text, nullable=True) # Additional details, e.g., old/new values for updates
user = db.relationship('User', backref='operation_logs')
file = db.relationship('File', backref='operation_logs')
category = db.relationship('Category', backref='operation_logs')
def to_dict(self):
return {
'id': self.id,
'user_id': self.user_id,
'username': self.user.username if self.user else None,
'operation_type': self.operation_type,
'file_id': self.file_id,
'original_filename': self.file.original_filename if self.file else None,
'category_id': self.category_id,
'category_name': self.category.name if self.category else None,
'timestamp': self.timestamp.isoformat(),
'details': self.details
}
class TeamMember(db.Model):
__tablename__ = 'team_members'
id = db.Column(db.Integer, primary_key=True)
leader_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False) # 团队长ID
member_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False) # 团员ID
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 联合唯一约束,确保一个用户只能有一个团队长
__table_args__ = (db.UniqueConstraint('member_id', name='uq_member_id'),)
def to_dict(self):
return {
'id': self.id,
'leader_id': self.leader_id,
'leader_username': self.team_leader.username if self.team_leader else None,
'member_id': self.member_id,
'member_username': self.member.username if self.member else None,
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat()
}