filesend/backend/app/models.py

285 lines
12 KiB
Python
Raw Normal View History

2025-10-10 17:25:29 +08:00
from datetime import datetime
from . import db
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.Text, nullable=False)
is_admin = db.Column(db.Boolean, default=False)
is_team_leader = db.Column(db.Boolean, default=False) # 是否为团队长
is_active = db.Column(db.Boolean, default=False) # 需要管理员激活
daily_quota = db.Column(db.Integer, default=5)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=db.func.current_timestamp())
# 关系
uploaded_files = db.relationship('File', backref='uploader', lazy=True, foreign_keys='File.uploaded_by')
taken_files = db.relationship('File', backref='taker', lazy=True, foreign_keys='File.taken_by')
downloads = db.relationship('Download', backref='user', lazy=True)
# 团队成员关系 - 作为团队长
team_members = db.relationship('TeamMember', backref='team_leader', lazy=True, foreign_keys='TeamMember.leader_id')
# 团队成员关系 - 作为团员
team_leader_rel = db.relationship('TeamMember', backref='member', lazy=True, foreign_keys='TeamMember.member_id', uselist=False)
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
def to_dict(self):
return {
'id': self.id,
'username': self.username,
'email': self.email,
'is_admin': self.is_admin,
'is_team_leader': self.is_team_leader,
'is_active': self.is_active,
'daily_quota': self.daily_quota,
'created_at': self.created_at.isoformat()
}
class File(db.Model):
__tablename__ = 'files'
id = db.Column(db.Integer, primary_key=True)
filename = db.Column(db.String(255), nullable=False) # 存储的文件名
original_filename = db.Column(db.String(255), nullable=False) # 原始文件名
file_path = db.Column(db.String(512), nullable=False)
description = db.Column(db.Text, nullable=True)
file_size = db.Column(db.Integer) # 字节数
file_hash = db.Column(db.String(64), unique=True, nullable=False) # 文件哈希,用于完整性校验和去重
is_taken = db.Column(db.Boolean, default=False)
category_id = db.Column(db.Integer, db.ForeignKey('categories.id'), nullable=False) # 新增分类字段,文件必须属于一个分类
uploaded_by = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
taken_by = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=True)
taken_at = db.Column(db.DateTime)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 关系
download = db.relationship('Download', backref='file', lazy=True, uselist=False)
category = db.relationship('Category', backref='files') # 新增分类关系
def to_dict(self, include_path=False):
data = {
'id': self.id,
'original_filename': self.original_filename,
'description': self.description,
'file_size': self.file_size,
'file_hash': self.file_hash,
'is_taken': self.is_taken,
'category_id': self.category_id,
'category_name': self.category.name if self.category else None,
'uploaded_by': self.uploader.username if self.uploader else None,
'taken_by': self.taker.username if self.taker else None,
'taken_at': self.taken_at.isoformat() if self.taken_at else None,
'created_at': self.created_at.isoformat()
}
if include_path:
data['file_path'] = self.file_path
return data
class Download(db.Model):
__tablename__ = 'downloads'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
file_id = db.Column(db.Integer, db.ForeignKey('files.id'), nullable=False)
download_time = db.Column(db.DateTime, default=datetime.utcnow)
def to_dict(self):
return {
'id': self.id,
'user_id': self.user_id,
'username': self.user.username,
'file_id': self.file_id,
'original_filename': self.file.original_filename,
'file_size': self.file.file_size,
'download_time': self.download_time.isoformat()
}
class SystemSettings(db.Model):
__tablename__ = 'system_settings'
id = db.Column(db.Integer, primary_key=True)
key = db.Column(db.String(100), unique=True, nullable=False)
value = db.Column(db.Text, nullable=False)
description = db.Column(db.String(255), nullable=True)
data_type = db.Column(db.String(20), default='string') # string, int, float, bool
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def to_dict(self):
return {
'id': self.id,
'key': self.key,
'value': self.value,
'description': self.description,
'data_type': self.data_type,
'updated_at': self.updated_at.isoformat()
}
@classmethod
def get_value(cls, key, default=None):
"""获取配置值"""
setting = cls.query.filter_by(key=key).first()
if not setting:
return default
if setting.data_type == 'int':
return int(setting.value)
elif setting.data_type == 'float':
return float(setting.value)
elif setting.data_type == 'bool':
return setting.value.lower() == 'true'
else:
return setting.value
@classmethod
def set_value(cls, key, value, description=None, data_type='string'):
"""设置配置值"""
setting = cls.query.filter_by(key=key).first()
if not setting:
setting = cls(key=key)
setting.value = str(value)
if description:
setting.description = description
if data_type:
setting.data_type = data_type
db.session.add(setting)
db.session.commit()
return setting
class Category(db.Model):
__tablename__ = 'categories'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False) # 移除unique=True约束允许不同父级下有同名分类
description = db.Column(db.Text, nullable=True)
parent_id = db.Column(db.Integer, db.ForeignKey('categories.id'), nullable=True) # 父分类ID
level = db.Column(db.Integer, default=1, nullable=False) # 分类层级
path = db.Column(db.String(255), nullable=False, unique=True) # 分类路径,如 /root/category1/subcategory1
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 关系
children = db.relationship('Category', backref=db.backref('parent', remote_side=[id]), lazy=True)
permissions = db.relationship('CategoryPermission', backref='category', lazy=True, cascade='all, delete-orphan')
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'description': self.description,
'parent_id': self.parent_id,
'level': self.level,
'path': self.path,
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat()
}
@classmethod
def get_default_category(cls):
"""获取默认的'未分类'分类"""
default_category = cls.query.filter_by(name='未分类').first()
if not default_category:
default_category = cls(name='未分类', description='默认分类', path='/未分类')
db.session.add(default_category)
db.session.commit()
return default_category
class CategoryPermission(db.Model):
__tablename__ = 'category_permissions'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
category_id = db.Column(db.Integer, db.ForeignKey('categories.id'), nullable=False)
inherit_to_children = db.Column(db.Boolean, default=True) # 是否继承权限给子分类
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 关系
user = db.relationship('User', backref='category_permissions')
# 联合唯一约束,确保一个用户对一个分类只有一条权限记录
__table_args__ = (db.UniqueConstraint('user_id', 'category_id', name='uq_user_category'),)
def to_dict(self):
return {
'id': self.id,
'user_id': self.user_id,
'username': self.user.username if self.user else None,
'category_id': self.category_id,
'category_name': self.category.name if self.category else None,
'inherit_to_children': self.inherit_to_children,
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat()
}
class OperationLog(db.Model):
__tablename__ = 'operation_logs'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
operation_type = db.Column(db.String(50), nullable=False) # e.g., 'upload', 'download', 'delete', 'update'
file_id = db.Column(db.Integer, db.ForeignKey('files.id'), nullable=True) # Optional, for file-related operations
category_id = db.Column(db.Integer, db.ForeignKey('categories.id'), nullable=True) # Optional, for category-related operations
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
details = db.Column(db.Text, nullable=True) # Additional details, e.g., old/new values for updates
user = db.relationship('User', backref='operation_logs')
file = db.relationship('File', backref='operation_logs')
category = db.relationship('Category', backref='operation_logs')
def to_dict(self):
return {
'id': self.id,
'user_id': self.user_id,
'username': self.user.username if self.user else None,
'operation_type': self.operation_type,
'file_id': self.file_id,
'original_filename': self.file.original_filename if self.file else None,
'category_id': self.category_id,
'category_name': self.category.name if self.category else None,
'timestamp': self.timestamp.isoformat(),
'details': self.details
}
class TeamMember(db.Model):
__tablename__ = 'team_members'
id = db.Column(db.Integer, primary_key=True)
leader_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False) # 团队长ID
member_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False) # 团员ID
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 联合唯一约束,确保一个用户只能有一个团队长
__table_args__ = (db.UniqueConstraint('member_id', name='uq_member_id'),)
def to_dict(self):
return {
'id': self.id,
'leader_id': self.leader_id,
'leader_username': self.team_leader.username if self.team_leader else None,
'member_id': self.member_id,
'member_username': self.member.username if self.member else None,
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat()
}