285 lines
12 KiB
Python
285 lines
12 KiB
Python
from datetime import datetime
|
||
from . import db
|
||
from werkzeug.security import generate_password_hash, check_password_hash
|
||
from datetime import datetime
|
||
|
||
|
||
class User(db.Model):
|
||
__tablename__ = 'users'
|
||
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
username = db.Column(db.String(80), unique=True, nullable=False)
|
||
email = db.Column(db.String(120), unique=True, nullable=False)
|
||
password_hash = db.Column(db.Text, nullable=False)
|
||
is_admin = db.Column(db.Boolean, default=False)
|
||
is_team_leader = db.Column(db.Boolean, default=False) # 是否为团队长
|
||
is_active = db.Column(db.Boolean, default=False) # 需要管理员激活
|
||
daily_quota = db.Column(db.Integer, default=5)
|
||
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
||
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=db.func.current_timestamp())
|
||
|
||
# 关系
|
||
uploaded_files = db.relationship('File', backref='uploader', lazy=True, foreign_keys='File.uploaded_by')
|
||
taken_files = db.relationship('File', backref='taker', lazy=True, foreign_keys='File.taken_by')
|
||
downloads = db.relationship('Download', backref='user', lazy=True)
|
||
# 团队成员关系 - 作为团队长
|
||
team_members = db.relationship('TeamMember', backref='team_leader', lazy=True, foreign_keys='TeamMember.leader_id')
|
||
# 团队成员关系 - 作为团员
|
||
team_leader_rel = db.relationship('TeamMember', backref='member', lazy=True, foreign_keys='TeamMember.member_id', uselist=False)
|
||
|
||
def set_password(self, password):
|
||
self.password_hash = generate_password_hash(password)
|
||
|
||
def check_password(self, password):
|
||
return check_password_hash(self.password_hash, password)
|
||
|
||
def to_dict(self):
|
||
return {
|
||
'id': self.id,
|
||
'username': self.username,
|
||
'email': self.email,
|
||
'is_admin': self.is_admin,
|
||
'is_team_leader': self.is_team_leader,
|
||
'is_active': self.is_active,
|
||
'daily_quota': self.daily_quota,
|
||
'created_at': self.created_at.isoformat()
|
||
}
|
||
|
||
|
||
class File(db.Model):
|
||
__tablename__ = 'files'
|
||
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
filename = db.Column(db.String(255), nullable=False) # 存储的文件名
|
||
original_filename = db.Column(db.String(255), nullable=False) # 原始文件名
|
||
file_path = db.Column(db.String(512), nullable=False)
|
||
description = db.Column(db.Text, nullable=True)
|
||
file_size = db.Column(db.Integer) # 字节数
|
||
file_hash = db.Column(db.String(64), unique=True, nullable=False) # 文件哈希,用于完整性校验和去重
|
||
is_taken = db.Column(db.Boolean, default=False)
|
||
category_id = db.Column(db.Integer, db.ForeignKey('categories.id'), nullable=False) # 新增分类字段,文件必须属于一个分类
|
||
uploaded_by = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
|
||
taken_by = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=True)
|
||
taken_at = db.Column(db.DateTime)
|
||
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
||
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
||
|
||
# 关系
|
||
download = db.relationship('Download', backref='file', lazy=True, uselist=False)
|
||
category = db.relationship('Category', backref='files') # 新增分类关系
|
||
|
||
def to_dict(self, include_path=False):
|
||
data = {
|
||
'id': self.id,
|
||
'original_filename': self.original_filename,
|
||
'description': self.description,
|
||
'file_size': self.file_size,
|
||
'file_hash': self.file_hash,
|
||
'is_taken': self.is_taken,
|
||
'category_id': self.category_id,
|
||
'category_name': self.category.name if self.category else None,
|
||
'uploaded_by': self.uploader.username if self.uploader else None,
|
||
'taken_by': self.taker.username if self.taker else None,
|
||
'taken_at': self.taken_at.isoformat() if self.taken_at else None,
|
||
'created_at': self.created_at.isoformat()
|
||
}
|
||
if include_path:
|
||
data['file_path'] = self.file_path
|
||
return data
|
||
|
||
|
||
class Download(db.Model):
|
||
__tablename__ = 'downloads'
|
||
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
|
||
file_id = db.Column(db.Integer, db.ForeignKey('files.id'), nullable=False)
|
||
download_time = db.Column(db.DateTime, default=datetime.utcnow)
|
||
|
||
def to_dict(self):
|
||
return {
|
||
'id': self.id,
|
||
'user_id': self.user_id,
|
||
'username': self.user.username,
|
||
'file_id': self.file_id,
|
||
'original_filename': self.file.original_filename,
|
||
'file_size': self.file.file_size,
|
||
'download_time': self.download_time.isoformat()
|
||
}
|
||
|
||
|
||
class SystemSettings(db.Model):
|
||
__tablename__ = 'system_settings'
|
||
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
key = db.Column(db.String(100), unique=True, nullable=False)
|
||
value = db.Column(db.Text, nullable=False)
|
||
description = db.Column(db.String(255), nullable=True)
|
||
data_type = db.Column(db.String(20), default='string') # string, int, float, bool
|
||
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
||
|
||
def to_dict(self):
|
||
return {
|
||
'id': self.id,
|
||
'key': self.key,
|
||
'value': self.value,
|
||
'description': self.description,
|
||
'data_type': self.data_type,
|
||
'updated_at': self.updated_at.isoformat()
|
||
}
|
||
|
||
@classmethod
|
||
def get_value(cls, key, default=None):
|
||
"""获取配置值"""
|
||
setting = cls.query.filter_by(key=key).first()
|
||
if not setting:
|
||
return default
|
||
|
||
if setting.data_type == 'int':
|
||
return int(setting.value)
|
||
elif setting.data_type == 'float':
|
||
return float(setting.value)
|
||
elif setting.data_type == 'bool':
|
||
return setting.value.lower() == 'true'
|
||
else:
|
||
return setting.value
|
||
|
||
@classmethod
|
||
def set_value(cls, key, value, description=None, data_type='string'):
|
||
"""设置配置值"""
|
||
setting = cls.query.filter_by(key=key).first()
|
||
if not setting:
|
||
setting = cls(key=key)
|
||
|
||
setting.value = str(value)
|
||
if description:
|
||
setting.description = description
|
||
if data_type:
|
||
setting.data_type = data_type
|
||
|
||
db.session.add(setting)
|
||
db.session.commit()
|
||
return setting
|
||
|
||
|
||
class Category(db.Model):
|
||
__tablename__ = 'categories'
|
||
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
name = db.Column(db.String(100), nullable=False) # 移除unique=True约束,允许不同父级下有同名分类
|
||
description = db.Column(db.Text, nullable=True)
|
||
parent_id = db.Column(db.Integer, db.ForeignKey('categories.id'), nullable=True) # 父分类ID
|
||
level = db.Column(db.Integer, default=1, nullable=False) # 分类层级
|
||
path = db.Column(db.String(255), nullable=False, unique=True) # 分类路径,如 /root/category1/subcategory1
|
||
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
||
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
||
|
||
# 关系
|
||
children = db.relationship('Category', backref=db.backref('parent', remote_side=[id]), lazy=True)
|
||
permissions = db.relationship('CategoryPermission', backref='category', lazy=True, cascade='all, delete-orphan')
|
||
|
||
def to_dict(self):
|
||
return {
|
||
'id': self.id,
|
||
'name': self.name,
|
||
'description': self.description,
|
||
'parent_id': self.parent_id,
|
||
'level': self.level,
|
||
'path': self.path,
|
||
'created_at': self.created_at.isoformat(),
|
||
'updated_at': self.updated_at.isoformat()
|
||
}
|
||
|
||
@classmethod
|
||
def get_default_category(cls):
|
||
"""获取默认的'未分类'分类"""
|
||
default_category = cls.query.filter_by(name='未分类').first()
|
||
if not default_category:
|
||
default_category = cls(name='未分类', description='默认分类', path='/未分类')
|
||
db.session.add(default_category)
|
||
db.session.commit()
|
||
return default_category
|
||
|
||
|
||
class CategoryPermission(db.Model):
|
||
__tablename__ = 'category_permissions'
|
||
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
|
||
category_id = db.Column(db.Integer, db.ForeignKey('categories.id'), nullable=False)
|
||
inherit_to_children = db.Column(db.Boolean, default=True) # 是否继承权限给子分类
|
||
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
||
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
||
|
||
# 关系
|
||
user = db.relationship('User', backref='category_permissions')
|
||
|
||
# 联合唯一约束,确保一个用户对一个分类只有一条权限记录
|
||
__table_args__ = (db.UniqueConstraint('user_id', 'category_id', name='uq_user_category'),)
|
||
|
||
def to_dict(self):
|
||
return {
|
||
'id': self.id,
|
||
'user_id': self.user_id,
|
||
'username': self.user.username if self.user else None,
|
||
'category_id': self.category_id,
|
||
'category_name': self.category.name if self.category else None,
|
||
'inherit_to_children': self.inherit_to_children,
|
||
'created_at': self.created_at.isoformat(),
|
||
'updated_at': self.updated_at.isoformat()
|
||
}
|
||
|
||
|
||
class OperationLog(db.Model):
|
||
__tablename__ = 'operation_logs'
|
||
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
|
||
operation_type = db.Column(db.String(50), nullable=False) # e.g., 'upload', 'download', 'delete', 'update'
|
||
file_id = db.Column(db.Integer, db.ForeignKey('files.id'), nullable=True) # Optional, for file-related operations
|
||
category_id = db.Column(db.Integer, db.ForeignKey('categories.id'), nullable=True) # Optional, for category-related operations
|
||
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
|
||
details = db.Column(db.Text, nullable=True) # Additional details, e.g., old/new values for updates
|
||
|
||
user = db.relationship('User', backref='operation_logs')
|
||
file = db.relationship('File', backref='operation_logs')
|
||
category = db.relationship('Category', backref='operation_logs')
|
||
|
||
def to_dict(self):
|
||
return {
|
||
'id': self.id,
|
||
'user_id': self.user_id,
|
||
'username': self.user.username if self.user else None,
|
||
'operation_type': self.operation_type,
|
||
'file_id': self.file_id,
|
||
'original_filename': self.file.original_filename if self.file else None,
|
||
'category_id': self.category_id,
|
||
'category_name': self.category.name if self.category else None,
|
||
'timestamp': self.timestamp.isoformat(),
|
||
'details': self.details
|
||
}
|
||
|
||
|
||
class TeamMember(db.Model):
|
||
__tablename__ = 'team_members'
|
||
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
leader_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False) # 团队长ID
|
||
member_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False) # 团员ID
|
||
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
||
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
||
|
||
# 联合唯一约束,确保一个用户只能有一个团队长
|
||
__table_args__ = (db.UniqueConstraint('member_id', name='uq_member_id'),)
|
||
|
||
def to_dict(self):
|
||
return {
|
||
'id': self.id,
|
||
'leader_id': self.leader_id,
|
||
'leader_username': self.team_leader.username if self.team_leader else None,
|
||
'member_id': self.member_id,
|
||
'member_username': self.member.username if self.member else None,
|
||
'created_at': self.created_at.isoformat(),
|
||
'updated_at': self.updated_at.isoformat()
|
||
}
|