174 lines
5.9 KiB
Python
174 lines
5.9 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
数据库迁移脚本:添加团队长功能
|
||
添加用户表字段:is_team_leader
|
||
创建团队关系表:team_members
|
||
"""
|
||
|
||
import sqlite3
|
||
import os
|
||
|
||
def migrate_database():
|
||
"""执行数据库迁移"""
|
||
# 数据库文件路径
|
||
db_path = os.path.join(os.path.dirname(__file__), '..', 'instance', 'filesend.db')
|
||
|
||
if not os.path.exists(db_path):
|
||
print(f"数据库文件不存在: {db_path}")
|
||
return False
|
||
|
||
try:
|
||
conn = sqlite3.connect(db_path)
|
||
cursor = conn.cursor()
|
||
|
||
# 1. 添加 is_team_leader 字段到 users 表
|
||
print("1. 检查并添加 is_team_leader 字段到 users 表...")
|
||
cursor.execute("PRAGMA table_info(users)")
|
||
columns = [column[1] for column in cursor.fetchall()]
|
||
|
||
if 'is_team_leader' not in columns:
|
||
cursor.execute("""
|
||
ALTER TABLE users ADD COLUMN is_team_leader BOOLEAN DEFAULT 0
|
||
""")
|
||
print(" - 成功添加 is_team_leader 字段")
|
||
else:
|
||
print(" - is_team_leader 字段已存在,跳过")
|
||
|
||
# 2. 创建 team_members 表
|
||
print("2. 创建 team_members 表...")
|
||
cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS team_members (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
leader_id INTEGER NOT NULL,
|
||
member_id INTEGER NOT NULL,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
FOREIGN KEY (leader_id) REFERENCES users (id) ON DELETE CASCADE,
|
||
FOREIGN KEY (member_id) REFERENCES users (id) ON DELETE CASCADE,
|
||
UNIQUE (member_id)
|
||
)
|
||
""")
|
||
print(" - 成功创建 team_members 表")
|
||
|
||
# 3. 创建索引优化查询性能
|
||
print("3. 创建索引...")
|
||
cursor.execute("""
|
||
CREATE INDEX IF NOT EXISTS idx_team_members_leader_id ON team_members(leader_id)
|
||
""")
|
||
cursor.execute("""
|
||
CREATE INDEX IF NOT EXISTS idx_team_members_member_id ON team_members(member_id)
|
||
""")
|
||
print(" - 成功创建索引")
|
||
|
||
# 4. 更新现有管理员用户为团队长(可选)
|
||
print("4. 更新现有管理员用户为团队长...")
|
||
cursor.execute("""
|
||
UPDATE users SET is_team_leader = 1 WHERE is_admin = 1
|
||
""")
|
||
updated_rows = cursor.rowcount
|
||
print(f" - 更新了 {updated_rows} 个管理员用户为团队长")
|
||
|
||
# 提交事务
|
||
conn.commit()
|
||
print("\n数据库迁移完成!")
|
||
|
||
# 显示迁移结果
|
||
print("\n迁移结果:")
|
||
cursor.execute("SELECT COUNT(*) FROM users WHERE is_team_leader = 1")
|
||
team_leader_count = cursor.fetchone()[0]
|
||
print(f"- 团队长用户数量: {team_leader_count}")
|
||
|
||
cursor.execute("SELECT COUNT(*) FROM team_members")
|
||
member_count = cursor.fetchone()[0]
|
||
print(f"- 团队成员数量: {member_count}")
|
||
|
||
return True
|
||
|
||
except sqlite3.Error as e:
|
||
print(f"数据库迁移失败: {e}")
|
||
conn.rollback()
|
||
return False
|
||
|
||
finally:
|
||
conn.close()
|
||
|
||
def rollback_migration():
|
||
"""回滚迁移(谨慎使用)"""
|
||
db_path = os.path.join(os.path.dirname(__file__), '..', 'instance', 'filesend.db')
|
||
|
||
try:
|
||
conn = sqlite3.connect(db_path)
|
||
cursor = conn.cursor()
|
||
|
||
print("开始回滚团队长功能迁移...")
|
||
|
||
# 删除 team_members 表
|
||
cursor.execute("DROP TABLE IF EXISTS team_members")
|
||
print("- 删除 team_members 表")
|
||
|
||
# 删除 users 表的 is_team_leader 字段
|
||
# SQLite 不支持直接删除列,需要重建表
|
||
print("- 重建 users 表(移除 is_team_leader 字段)...")
|
||
|
||
# 获取现有表结构
|
||
cursor.execute("PRAGMA table_info(users)")
|
||
columns = cursor.fetchall()
|
||
|
||
# 构建不包含 is_team_leader 的列定义
|
||
column_defs = []
|
||
for col in columns:
|
||
if col[1] != 'is_team_leader':
|
||
col_name = col[1]
|
||
col_type = col[2]
|
||
not_null = "NOT NULL" if col[3] else ""
|
||
default = f"DEFAULT {col[4]}" if col[4] is not None else ""
|
||
pk = "PRIMARY KEY" if col[5] else ""
|
||
column_defs.append(f"{col_name} {col_type} {not_null} {default} {pk}".strip())
|
||
|
||
# 创建新表
|
||
cursor.execute(f"""
|
||
CREATE TABLE users_new (
|
||
{', '.join(column_defs)}
|
||
)
|
||
""")
|
||
|
||
# 复制数据
|
||
column_names = [col[1] for col in columns if col[1] != 'is_team_leader']
|
||
cursor.execute(f"""
|
||
INSERT INTO users_new ({', '.join(column_names)})
|
||
SELECT {', '.join(column_names)} FROM users
|
||
""")
|
||
|
||
# 删除旧表,重命名新表
|
||
cursor.execute("DROP TABLE users")
|
||
cursor.execute("ALTER TABLE users_new RENAME TO users")
|
||
print("- 成功重建 users 表")
|
||
|
||
conn.commit()
|
||
print("回滚完成!")
|
||
|
||
except sqlite3.Error as e:
|
||
print(f"回滚失败: {e}")
|
||
conn.rollback()
|
||
|
||
finally:
|
||
conn.close()
|
||
|
||
if __name__ == "__main__":
|
||
import sys
|
||
|
||
if len(sys.argv) > 1 and sys.argv[1] == "rollback":
|
||
confirm = input("确定要回滚团队长功能迁移吗?这将删除所有团队数据!(yes/no): ")
|
||
if confirm.lower() == 'yes':
|
||
rollback_migration()
|
||
else:
|
||
print("取消回滚操作")
|
||
else:
|
||
print("开始执行团队长功能数据库迁移...")
|
||
print("=" * 50)
|
||
success = migrate_database()
|
||
if success:
|
||
print("\n✅ 迁移成功完成!")
|
||
else:
|
||
print("\n❌ 迁移失败!")
|
||
sys.exit(1) |