修复文件

This commit is contained in:
taiyi 2025-11-11 23:04:01 +08:00
parent a398ebd0c9
commit f14b36e9b3
24 changed files with 1979 additions and 59 deletions

1
.env
View File

@ -7,7 +7,6 @@ FLASK_DEBUG=True
# 数据库配置
# DATABASE_URL=sqlite:///kamaxitong.db
DATABASE_URL=mysql+pymysql://root:taiyi1224@localhost/kamaxitong
# 安全配置

6
.idea/vcs.xml Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

262
MYSQL_CONFIG_GUIDE.md Normal file
View File

@ -0,0 +1,262 @@
# MySQL数据库配置指南
## 概述
系统已经配置为使用MySQL数据库并从`.env`文件读取配置。
## 配置步骤
### 1. 确保安装MySQL客户端依赖
安装PyMySQLMySQL Python驱动
```bash
pip install PyMySQL
# 或者
pip install pymysql
```
### 2. 配置.env文件
编辑项目根目录的`.env`文件:
```env
# 数据库配置
# 格式: mysql+pymysql://用户名:密码@主机:端口/数据库名
DATABASE_URL=mysql+pymysql://root:你的密码@localhost/kamaxitong
# 示例:
DATABASE_URL=mysql+pymysql://root:taiyi1224@localhost/kamaxitong
```
### 3. 创建数据库
在MySQL中创建数据库
```sql
-- 登录MySQL
mysql -u root -p
-- 创建数据库
CREATE DATABASE kamaxitong CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
-- 授权(可选)
GRANT ALL PRIVILEGES ON kamaxitong.* TO 'root'@'localhost';
FLUSH PRIVILEGES;
```
### 4. 更新.env文件中的数据库URL
`.env`文件中的`DATABASE_URL`格式:
```env
# 标准格式
DATABASE_URL=mysql+pymysql://用户名:密码@主机:端口/数据库名
# 本地示例
DATABASE_URL=mysql+pymysql://root:taiyi1224@localhost/kamaxitong
# 远程示例
DATABASE_URL=mysql+pymysql://root:password@192.168.1.100:3306/kamaxitong
```
### 5. 初始化数据库
#### 方式1: 使用Flask-Migrate推荐
```bash
# 初始化迁移(如果还没有)
flask db init
# 生成迁移文件
flask db migrate -m "Initial migration"
# 应用迁移
flask db upgrade
```
#### 方式2: 使用快速修复脚本
```bash
python quick_fix.py
```
### 6. 启动应用
```bash
python run.py
```
## 数据库配置参数说明
| 参数 | 说明 | 示例 |
|------|------|------|
| 数据库类型 | mysql+pymysql | mysql+pymysql |
| 用户名 | MySQL用户名 | root |
| 密码 | MySQL密码 | taiyi1224 |
| 主机 | 数据库主机 | localhost 或 192.168.1.100 |
| 端口 | MySQL端口可选 | 3306默认 |
| 数据库名 | 要使用的数据库名 | kamaxitong |
## 完整的.env文件示例
```env
# 环境配置
FLASK_ENV=development
FLASK_DEBUG=True
# 数据库配置 - MySQL
DATABASE_URL=mysql+pymysql://root:taiyi1224@localhost/kamaxitong
# 安全配置
SECRET_KEY=taiyi1224
AUTH_SECRET_KEY=taiyi1224
# 验证器配置
OFFLINE_CACHE_DAYS=7
MAX_FAILED_ATTEMPTS=5
LOCKOUT_MINUTES=10
MAX_UNBIND_TIMES=3
# 卡密配置
LICENSE_KEY_LENGTH=32
LICENSE_KEY_PREFIX=
# API配置
API_VERSION=v1
ITEMS_PER_PAGE=20
# 服务器配置
HOST=0.0.0.0
PORT=5000
# 文件上传配置
MAX_CONTENT_LENGTH=16777216
UPLOAD_FOLDER=static/uploads
# 日志配置
LOG_LEVEL=INFO
LOG_FILE=logs/kamaxitong.log
```
## 验证配置
### 1. 检查.env文件是否被加载
启动应用时查看控制台输出:
```
成功加载.env文件
```
### 2. 验证数据库连接
在Python中测试
```python
from app import create_app
from app import db
app = create_app()
with app.app_context():
print("数据库URI:", app.config['SQLALCHEMY_DATABASE_URI'])
db.create_all() # 测试连接
print("数据库连接成功!")
```
## 常见问题
### Q1: ImportError: No module named 'pymysql'
**解决方案:**
```bash
pip install pymysql
```
### Q2: 1049 (42000): Unknown database 'kamaxitong'
**解决方案:**
```sql
CREATE DATABASE kamaxitong CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
```
### Q3: 1045 (28000): Access denied for user 'root'@'localhost'
**解决方案:**
1. 检查.env文件中的用户名和密码是否正确
2. 确保MySQL用户有权限访问数据库
3. 重置MySQL root密码
### Q4: 2003 (HY000): Can't connect to MySQL server
**解决方案:**
1. 确保MySQL服务已启动
2. 检查主机和端口是否正确
3. 检查防火墙设置
### Q5: .env文件没有被加载
**解决方案:**
1. 确保python-dotenv已安装`pip install python-dotenv`
2. 确保.env文件在项目根目录
3. 重启应用
## 性能优化建议
### 1. 配置MySQL连接池
在config.py中已经配置
```python
SQLALCHEMY_ENGINE_OPTIONS = {
"future": True,
"pool_pre_ping": True,
"pool_size": 10,
"pool_recycle": 3600,
"max_overflow": 20
}
```
### 2. 启用MySQL查询缓存
在MySQL配置文件中my.cnf
```ini
[mysqld]
query_cache_type = 1
query_cache_size = 256M
```
### 3. 设置合适的字符集
确保数据库、表、列都使用utf8mb4字符集
```sql
CREATE DATABASE kamaxitong
CHARACTER SET utf8mb4
COLLATE utf8mb4_unicode_ci;
```
## 备份和恢复
### 备份数据库
```bash
mysqldump -u root -p kamaxitong > backup.sql
```
### 恢复数据库
```bash
mysql -u root -p kamaxitong < backup.sql
```
## 总结
系统已经配置为使用MySQL和.env文件
- ✅ .env文件支持
- ✅ MySQL配置已就绪
- ✅ 从环境变量读取配置
- ✅ 支持开发/生产/测试环境
只需确保MySQL服务运行、依赖安装正确、.env文件配置正确即可

View File

@ -1,10 +1,49 @@
from flask import request, jsonify, current_app
from app import db
from app.models import Admin
from app.models import Admin, AuditLog
from . import api_bp
from flask_login import current_user, login_required
from werkzeug.security import generate_password_hash
import functools
import re
# 响应码定义
class ResponseCode:
SUCCESS = 0
VALIDATION_ERROR = 1001
AUTHENTICATION_FAILED = 1002
PERMISSION_DENIED = 1003
NOT_FOUND = 1004
SERVER_ERROR = 5000
DUPLICATE_USERNAME = 2001
INVALID_DATA = 2002
CANNOT_DELETE_SELF = 2003
CANNOT_DISABLE_SELF = 2004
OPERATION_FAILED = 3001
def create_response(success, data=None, message='', code=ResponseCode.SUCCESS, status_code=200):
"""统一的响应格式"""
return jsonify({
'success': success,
'data': data,
'message': message,
'code': code
}), status_code
def handle_exceptions(f):
"""异常处理装饰器"""
@functools.wraps(f)
def decorated_function(*args, **kwargs):
try:
return f(*args, **kwargs)
except ValueError as e:
current_app.logger.error(f"数据验证错误: {str(e)}")
return create_response(False, message=str(e), code=ResponseCode.VALIDATION_ERROR, status_code=400)
except Exception as e:
db.session.rollback()
current_app.logger.error(f"服务器错误: {str(e)}", exc_info=True)
return create_response(False, message='服务器内部错误,请稍后重试', code=ResponseCode.SERVER_ERROR, status_code=500)
return decorated_function
def require_admin(f):
"""管理员权限验证装饰器"""
@ -12,54 +51,346 @@ def require_admin(f):
def decorated_function(*args, **kwargs):
# 检查用户是否已认证
if not current_user.is_authenticated:
return jsonify({
'success': False,
'message': '需要登录'
}), 401
return create_response(False, message='需要登录', code=ResponseCode.AUTHENTICATION_FAILED, status_code=401)
# 检查是否为超级管理员
if not current_user.is_super_admin():
return jsonify({
'success': False,
'message': '需要超级管理员权限'
}), 403
return create_response(False, message='需要超级管理员权限', code=ResponseCode.PERMISSION_DENIED, status_code=403)
return f(*args, **kwargs)
return decorated_function
def log_audit(action, target_type, target_id=None, details=None):
"""记录审计日志"""
try:
ip_address = request.headers.get('X-Forwarded-For', request.remote_addr)
user_agent = request.headers.get('User-Agent', '')
AuditLog.log_action(
admin_id=current_user.admin_id,
action=action,
target_type=target_type,
target_id=target_id,
details=details,
ip_address=ip_address,
user_agent=user_agent
)
except Exception as e:
current_app.logger.error(f"记录审计日志失败: {str(e)}")
def validate_username(username):
"""验证用户名"""
if not username or not username.strip():
return False, '用户名不能为空'
if len(username.strip()) < 3:
return False, '用户名至少3个字符'
if len(username.strip()) > 32:
return False, '用户名不能超过32个字符'
if not re.match(r'^[a-zA-Z0-9_]+$', username.strip()):
return False, '用户名只能包含字母、数字和下划线'
return True, ''
def validate_password(password):
"""验证密码强度"""
if not password or not password.strip():
return False, '密码不能为空'
if len(password) < 6:
return False, '密码长度至少6位'
if len(password) > 128:
return False, '密码长度不能超过128个字符'
return True, ''
def validate_email(email):
"""验证邮箱"""
if email and '@' in email:
if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
return False, '邮箱格式不正确'
return True, ''
def validate_admin_data(data, is_create=True):
"""验证管理员数据"""
if not data:
return False, '请求数据为空'
return False, '请求数据为空', ResponseCode.VALIDATION_ERROR
# 验证用户名
username = data.get('username', '').strip()
is_valid, msg = validate_username(username)
if not is_valid:
return False, msg, ResponseCode.VALIDATION_ERROR
# 检查用户名是否已存在
if is_create:
username = data.get('username', '').strip()
if not username:
return False, '用户名不能为空'
# 检查用户名是否已存在
existing = Admin.query.filter_by(username=username).first()
existing = Admin.query.filter(Admin.username == username, Admin.is_deleted == 0).first()
if existing:
return False, '用户名已存在'
# 检查密码是否为空
password = data.get('password', '')
if not password or not password.strip():
return False, '密码不能为空'
# 验证邮箱格式(如果提供)
return False, '用户名已存在', ResponseCode.DUPLICATE_USERNAME
else:
# 编辑时检查其他用户是否使用相同用户名
admin_id = data.get('admin_id')
existing = Admin.query.filter(
Admin.username == username,
Admin.is_deleted == 0,
Admin.admin_id != admin_id
).first()
if existing:
return False, '用户名已存在', ResponseCode.DUPLICATE_USERNAME
# 验证邮箱
email = data.get('email', '').strip()
if email and '@' not in email:
return False, '邮箱格式不正确'
is_valid, msg = validate_email(email)
if not is_valid:
return False, msg, ResponseCode.VALIDATION_ERROR
# 验证密码
if is_create:
password = data.get('password', '')
is_valid, msg = validate_password(password)
if not is_valid:
return False, msg, ResponseCode.VALIDATION_ERROR
else:
# 编辑时,如果提供了密码则验证
password = data.get('password', '')
if password:
is_valid, msg = validate_password(password)
if not is_valid:
return False, msg, ResponseCode.VALIDATION_ERROR
# 验证角色
role = data.get('role')
if role is not None and role not in [0, 1]:
return False, '角色值无效'
return False, '角色值无效', ResponseCode.VALIDATION_ERROR
# 验证状态
status = data.get('status')
if status is not None and status not in [0, 1]:
return False, '状态值无效'
return True, ''
return False, '状态值无效', ResponseCode.VALIDATION_ERROR
return True, '', ResponseCode.SUCCESS
@api_bp.route('/admins', methods=['GET'])
@require_admin
@handle_exceptions
def get_admins():
"""获取管理员列表"""
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 10, type=int), 100)
keyword = request.args.get('keyword', '').strip()
role = request.args.get('role', type=int) if request.args.get('role') != '' else None
status = request.args.get('status', type=int) if request.args.get('status') != '' else None
query = Admin.get_query()
# 关键词搜索
if keyword:
query = query.filter(Admin.username.contains(keyword))
# 角色筛选
if role is not None:
query = query.filter(Admin.role == role)
# 状态筛选
if status is not None:
query = query.filter(Admin.status == status)
# 分页
pagination = query.order_by(Admin.create_time.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
admins = [admin.to_dict() for admin in pagination.items]
return create_response(True, data={
'admins': admins,
'pagination': {
'page': page,
'per_page': per_page,
'total': pagination.total,
'pages': pagination.pages,
'has_prev': pagination.has_prev,
'has_next': pagination.has_next
}
})
@api_bp.route('/admins', methods=['POST'])
@require_admin
@handle_exceptions
def create_admin():
"""创建管理员"""
data = request.get_json()
if not data:
return create_response(False, message='请求数据为空', code=ResponseCode.VALIDATION_ERROR, status_code=400)
# 验证数据
is_valid, message, code = validate_admin_data(data, is_create=True)
if not is_valid:
return create_response(False, message=message, code=code, status_code=400)
username = data.get('username', '').strip()
email = data.get('email', '').strip()
role = data.get('role', 0)
status = data.get('status', 1)
password = data.get('password', '')
try:
# 创建管理员
admin = Admin(
username=username,
email=email,
role=role,
status=status
)
admin.set_password(password)
db.session.add(admin)
db.session.commit()
# 记录审计日志
log_audit('CREATE', 'ADMIN', admin.admin_id, {
'username': username,
'role': role,
'status': status
})
return create_response(True, data=admin.to_dict(), message='管理员创建成功')
except Exception as e:
db.session.rollback()
current_app.logger.error(f"创建管理员失败: {str(e)}", exc_info=True)
return create_response(False, message='创建管理员失败', code=ResponseCode.OPERATION_FAILED, status_code=500)
@api_bp.route('/admins/<int:admin_id>', methods=['GET'])
@require_admin
@handle_exceptions
def get_admin(admin_id):
"""获取管理员详情"""
admin = Admin.query.filter(Admin.admin_id == admin_id, Admin.is_deleted == 0).first()
if not admin:
return create_response(False, message='管理员不存在', code=ResponseCode.NOT_FOUND, status_code=404)
return create_response(True, data=admin.to_dict())
@api_bp.route('/admins/<int:admin_id>', methods=['PUT'])
@require_admin
@handle_exceptions
def update_admin(admin_id):
"""更新管理员"""
admin = Admin.query.filter(Admin.admin_id == admin_id, Admin.is_deleted == 0).first()
if not admin:
return create_response(False, message='管理员不存在', code=ResponseCode.NOT_FOUND, status_code=404)
data = request.get_json()
if not data:
return create_response(False, message='请求数据为空', code=ResponseCode.VALIDATION_ERROR, status_code=400)
# 验证数据
data['admin_id'] = admin_id
is_valid, message, code = validate_admin_data(data, is_create=False)
if not is_valid:
return create_response(False, message=message, code=code, status_code=400)
try:
# 记录旧值
old_data = {
'email': admin.email,
'role': admin.role,
'status': admin.status
}
# 更新字段
if 'email' in data:
admin.email = data['email'].strip()
if 'role' in data:
admin.role = data['role']
if 'status' in data:
admin.status = data['status']
# 如果提供了密码,则更新密码
password = data.get('password', '')
if password and password.strip():
admin.set_password(password)
db.session.commit()
# 记录审计日志
log_audit('UPDATE', 'ADMIN', admin_id, {
'old': old_data,
'new': {
'email': admin.email,
'role': admin.role,
'status': admin.status
}
})
return create_response(True, data=admin.to_dict(), message='管理员更新成功')
except Exception as e:
db.session.rollback()
current_app.logger.error(f"更新管理员失败: {str(e)}", exc_info=True)
return create_response(False, message='更新管理员失败', code=ResponseCode.OPERATION_FAILED, status_code=500)
@api_bp.route('/admins/<int:admin_id>/toggle-status', methods=['POST'])
@require_admin
@handle_exceptions
def toggle_admin_status(admin_id):
"""切换管理员状态"""
admin = Admin.query.filter(Admin.admin_id == admin_id, Admin.is_deleted == 0).first()
if not admin:
return create_response(False, message='管理员不存在', code=ResponseCode.NOT_FOUND, status_code=404)
# 不允许禁用自己
if current_user.admin_id == admin_id and admin.status == 1:
return create_response(False, message='不能禁用当前登录的管理员', code=ResponseCode.CANNOT_DISABLE_SELF, status_code=400)
try:
old_status = admin.status
admin.status = 0 if admin.status == 1 else 1
db.session.commit()
status_name = '正常' if admin.status == 1 else '禁用'
action = '启用' if admin.status == 1 else '禁用'
# 记录审计日志
log_audit('TOGGLE_STATUS', 'ADMIN', admin_id, {
'old_status': old_status,
'new_status': admin.status
})
return create_response(True, data={
'status': admin.status,
'status_name': status_name
}, message=f'管理员已{action}')
except Exception as e:
db.session.rollback()
current_app.logger.error(f"切换管理员状态失败: {str(e)}", exc_info=True)
return create_response(False, message='切换状态失败', code=ResponseCode.OPERATION_FAILED, status_code=500)
@api_bp.route('/admins/<int:admin_id>', methods=['DELETE'])
@require_admin
@handle_exceptions
def delete_admin(admin_id):
"""删除管理员(软删除)"""
admin = Admin.query.filter(Admin.admin_id == admin_id, Admin.is_deleted == 0).first()
if not admin:
return create_response(False, message='管理员不存在', code=ResponseCode.NOT_FOUND, status_code=404)
# 不允许删除自己
if current_user.admin_id == admin_id:
return create_response(False, message='不能删除当前登录的管理员', code=ResponseCode.CANNOT_DELETE_SELF, status_code=400)
try:
# 软删除
admin.soft_delete()
db.session.commit()
# 记录审计日志
log_audit('DELETE', 'ADMIN', admin_id, {
'username': admin.username
})
return create_response(True, message='管理员删除成功')
except Exception as e:
db.session.rollback()
current_app.logger.error(f"删除管理员失败: {str(e)}", exc_info=True)
return create_response(False, message='删除管理员失败', code=ResponseCode.OPERATION_FAILED, status_code=500)

View File

@ -243,26 +243,6 @@
}
}
function showNotification(message, type = 'info') {
const alertDiv = document.createElement('div');
alertDiv.className = `alert alert-${type} alert-dismissible fade show position-fixed`;
alertDiv.style.top = '20px';
alertDiv.style.right = '20px';
alertDiv.style.zIndex = '9999';
alertDiv.innerHTML = `
${message}
<button type="button" class="btn-close" data-bs-dismiss="alert"></button>
`;
document.body.appendChild(alertDiv);
setTimeout(() => {
if (alertDiv.parentNode) {
alertDiv.remove();
}
}, 5000);
}
function showAdminModal(admin = null) {
resetFormValidation();

21
check_admin_data.py Normal file
View File

@ -0,0 +1,21 @@
import os
import sys
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 设置数据库URL为SQLite
os.environ['DATABASE_URL'] = 'sqlite:///kamaxitong.db'
from app import create_app, db
from app.models.admin import Admin
# 创建应用实例
app = create_app()
with app.app_context():
# 检查管理员数据
admins = Admin.query.all()
print(f"Found {len(admins)} admin users:")
for admin in admins:
print(f" - ID: {admin.admin_id}, Username: {admin.username}, Role: {admin.role}, Status: {admin.status}")

29
check_db_structure.py Normal file
View File

@ -0,0 +1,29 @@
import sqlite3
import os
# 连接到正确的数据库文件在instance目录中
db_path = os.path.join('instance', 'kamaxitong.db')
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 获取 admin 表的结构信息
cursor.execute('PRAGMA table_info(admin)')
columns = cursor.fetchall()
print('Admin table columns:')
for col in columns:
print(f" {col}")
# 检查是否有 is_deleted 字段
has_is_deleted = any(col[1] == 'is_deleted' for col in columns)
print(f"\nHas is_deleted column: {has_is_deleted}")
# 检查所有表
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
print(f"\nAll tables:")
for table in tables:
print(f" {table[0]}")
# 关闭连接
conn.close()

35
check_migration_status.py Normal file
View File

@ -0,0 +1,35 @@
import os
import sys
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 设置数据库URL为SQLite
os.environ['DATABASE_URL'] = 'sqlite:///kamaxitong.db'
from app import create_app, db
# 创建应用实例
app = create_app()
with app.app_context():
# 检查当前迁移状态
from alembic.runtime.migration import MigrationContext
from alembic.script import ScriptDirectory
# 获取迁移脚本目录
migrate_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'migrations')
script = ScriptDirectory(migrate_dir)
# 获取当前数据库的迁移状态
conn = db.engine.connect()
context = MigrationContext.configure(conn)
current_rev = context.get_current_revision()
print(f"Current database revision: {current_rev}")
# 获取所有迁移脚本
revisions = [rev.revision for rev in script.walk_revisions()]
print(f"Available revisions: {revisions}")
conn.close()

40
check_mysql_connection.py Normal file
View File

@ -0,0 +1,40 @@
import os
import sys
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app import create_app, db
# 创建应用实例
app = create_app()
with app.app_context():
try:
# 测试数据库连接
db.engine.connect()
print("✓ Database connection successful")
# 检查表结构
from sqlalchemy import inspect
inspector = inspect(db.engine)
tables = inspector.get_table_names()
print(f"Database tables: {tables}")
# 检查 admin 表结构
if 'admin' in tables:
columns = inspector.get_columns('admin')
print("Admin table columns:")
for col in columns:
print(f" - {col['name']}: {col['type']}")
# 检查是否有 is_deleted 字段
has_is_deleted = any(col['name'] == 'is_deleted' for col in columns)
print(f"Has is_deleted column: {has_is_deleted}")
else:
print("Admin table not found")
except Exception as e:
print(f"✗ Database connection failed: {e}")
import traceback
traceback.print_exc()

31
check_mysql_data.py Normal file
View File

@ -0,0 +1,31 @@
import os
import sys
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 在导入应用之前加载环境变量
try:
from dotenv import load_dotenv
if load_dotenv():
print("成功加载.env文件")
except ImportError:
print("python-dotenv未安装跳过.env文件加载")
from app import create_app, db
from app.models.admin import Admin
# 创建应用实例
app = create_app()
with app.app_context():
try:
# 检查管理员数据
admins = Admin.query.all()
print(f"Found {len(admins)} admin users:")
for admin in admins:
print(f" - ID: {admin.admin_id}, Username: {admin.username}, Role: {admin.role}, Status: {admin.status}")
except Exception as e:
print(f"Database query failed: {e}")
import traceback
traceback.print_exc()

50
check_mysql_structure.py Normal file
View File

@ -0,0 +1,50 @@
import os
import sys
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 在导入应用之前加载环境变量
try:
from dotenv import load_dotenv
if load_dotenv():
print("成功加载.env文件")
except ImportError:
print("python-dotenv未安装跳过.env文件加载")
from app import create_app, db
# 创建应用实例
app = create_app()
with app.app_context():
try:
# 检查表结构
from sqlalchemy import inspect
inspector = inspect(db.engine)
tables = inspector.get_table_names()
print(f"Database tables: {tables}")
# 检查 admin 表结构
if 'admin' in tables:
columns = inspector.get_columns('admin')
print("\nAdmin table columns:")
for col in columns:
print(f" - {col['name']}: {col['type']}")
# 检查是否有 is_deleted 字段
has_is_deleted = any(col['name'] == 'is_deleted' for col in columns)
print(f"\nHas is_deleted column: {has_is_deleted}")
else:
print("Admin table not found")
# 检查 audit_log 表
if 'audit_log' in tables:
print("\nAudit_log table exists")
else:
print("\nAudit_log table not found")
except Exception as e:
print(f"Database inspection failed: {e}")
import traceback
traceback.print_exc()

290
complete_fix.py Normal file
View File

@ -0,0 +1,290 @@
#!/usr/bin/env python3
"""
完整的系统修复脚本
修复数据库结构前端页面和后端接口的匹配问题
使用方法:
python complete_fix.py
"""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app import create_app, db
from app.models import Admin, AuditLog
def fix_database_structure():
"""修复数据库结构"""
print("\n" + "=" * 60)
print("1. 修复数据库结构")
print("=" * 60)
app = create_app()
with app.app_context():
try:
# 检查并修复admin表
print("\n📋 检查admin表结构...")
admin_columns = [c.name for c in Admin.__table__.columns]
print(f" 当前字段: {', '.join(admin_columns)}")
# 检查is_deleted字段
if 'is_deleted' not in admin_columns:
print(" ⚠️ 缺少 is_deleted 字段,正在添加...")
db.session.execute("ALTER TABLE admin ADD COLUMN is_deleted INT NOT NULL DEFAULT 0")
db.session.execute("CREATE INDEX ix_admin_is_deleted ON admin(is_deleted)")
print(" ✅ 已添加 is_deleted 字段")
else:
print(" ✅ is_deleted 字段已存在")
# 检查delete_time字段
if 'delete_time' not in admin_columns:
print(" ⚠️ 缺少 delete_time 字段,正在添加...")
db.session.execute("ALTER TABLE admin ADD COLUMN delete_time DATETIME NULL")
print(" ✅ 已添加 delete_time 字段")
else:
print(" ✅ delete_time 字段已存在")
db.session.commit()
# 创建audit_log表
print("\n📋 创建audit_log表...")
try:
# 测试表是否存在
AuditLog.query.count()
print(" ✅ audit_log 表已存在")
except Exception as e:
print(f" ⚠️ audit_log 表不存在,正在创建...")
db.session.execute("""
CREATE TABLE audit_log (
log_id INT AUTO_INCREMENT PRIMARY KEY,
admin_id INT NOT NULL,
action VARCHAR(32) NOT NULL,
target_type VARCHAR(32) NOT NULL,
target_id INT NULL,
details TEXT NULL,
ip_address VARCHAR(32) NULL,
user_agent VARCHAR(256) NULL,
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (admin_id) REFERENCES admin(admin_id)
)
""")
db.session.execute("CREATE INDEX ix_audit_log_admin_id ON audit_log(admin_id)")
db.session.execute("CREATE INDEX ix_audit_log_action ON audit_log(action)")
db.session.execute("CREATE INDEX ix_audit_log_create_time ON audit_log(create_time)")
db.session.commit()
print(" ✅ audit_log 表创建成功")
# 验证修复结果
print("\n✅ 数据库结构修复完成!")
return True
except Exception as e:
db.session.rollback()
print(f"\n❌ 数据库修复失败: {str(e)}")
import traceback
traceback.print_exc()
return False
def check_admin_accounts():
"""检查管理员账号"""
print("\n" + "=" * 60)
print("2. 检查管理员账号")
print("=" * 60)
app = create_app()
with app.app_context():
try:
# 查看所有管理员
admins = Admin.query.all()
print(f"\n📊 系统中共有 {len(admins)} 个管理员账号:")
if len(admins) == 0:
print(" ⚠️ 没有管理员账号!")
return False
for admin in admins:
print(f" - ID: {admin.admin_id}, 用户名: {admin.username}, 角色: {admin.role}, 状态: {admin.status}")
# 检查是否有超级管理员
super_admins = Admin.query.filter_by(role=1, status=1, is_deleted=0).all()
if len(super_admins) == 0:
print("\n ⚠️ 没有激活的超级管理员!")
return False
print(f"\n✅ 共有 {len(super_admins)} 个超级管理员可用")
return True
except Exception as e:
print(f"\n❌ 检查管理员账号失败: {str(e)}")
import traceback
traceback.print_exc()
return False
def test_api_endpoints():
"""测试API端点"""
print("\n" + "=" * 60)
print("3. 测试API端点")
print("=" * 60)
app = create_app()
with app.app_context():
try:
# 测试导入admin API
from app.api.admin import ResponseCode, create_response
print("✅ admin API 模块导入成功")
# 测试验证函数
from app.api.admin import validate_username, validate_password, validate_email
print("✅ 验证函数导入成功")
# 测试响应格式
response = create_response(True, data={'test': 'data'}, message='测试', code=ResponseCode.SUCCESS)
print("✅ 统一响应格式正常")
print("\n✅ API端点测试完成")
return True
except Exception as e:
print(f"\n❌ API测试失败: {str(e)}")
import traceback
traceback.print_exc()
return False
def check_templates():
"""检查前端模板"""
print("\n" + "=" * 60)
print("4. 检查前端模板")
print("=" * 60)
try:
# 检查关键模板文件
templates = {
'login.html': 'app/web/templates/login.html',
'admin_list.html': 'app/web/templates/admin/list.html',
'base.html': 'app/web/templates/base.html'
}
for name, path in templates.items():
if os.path.exists(path):
print(f"{name} 存在")
else:
print(f"{name} 不存在: {path}")
return False
# 检查list.html的内容
with open('app/web/templates/admin/list.html', 'r', encoding='utf-8') as f:
content = f.read()
if 'function()' in content and 'DOMContentLoaded' in content:
print(" ✅ admin list.html JavaScript代码正常")
else:
print(" ⚠️ admin list.html 可能缺少JavaScript代码")
print("\n✅ 前端模板检查完成")
return True
except Exception as e:
print(f"\n❌ 模板检查失败: {str(e)}")
import traceback
traceback.print_exc()
return False
def create_test_super_admin():
"""创建测试超级管理员(如果需要)"""
print("\n" + "=" * 60)
print("5. 创建测试超级管理员")
print("=" * 60)
app = create_app()
with app.app_context():
try:
# 检查是否已有超级管理员
super_admin = Admin.query.filter_by(role=1, status=1, is_deleted=0).first()
if super_admin:
print(f" ✅ 已存在超级管理员: {super_admin.username}")
return True
print(" ⚠️ 没有超级管理员,创建默认账号...")
# 创建默认超级管理员
admin = Admin(
username='admin',
email='admin@example.com',
role=1,
status=1
)
admin.set_password('admin123456')
db.session.add(admin)
db.session.commit()
print(" ✅ 创建默认超级管理员成功")
print(" 📝 用户名: admin")
print(" 📝 密码: admin123456")
print(" ⚠️ 请登录后立即修改密码!")
return True
except Exception as e:
db.session.rollback()
print(f"\n❌ 创建超级管理员失败: {str(e)}")
import traceback
traceback.print_exc()
return False
def main():
"""主函数"""
print("\n" + "=" * 60)
print("🚀 KaMiXiTong 系统完整修复工具")
print("=" * 60)
# 检查config.py
if not os.path.exists('config.py'):
print("\n❌ 未找到 config.py 文件")
print("请确保在项目根目录运行此脚本\n")
return 1
success = True
# 1. 修复数据库结构
if not fix_database_structure():
success = False
# 2. 检查管理员账号
if not check_admin_accounts():
print("\n⚠️ 管理员账号有问题,尝试创建默认账号...")
if not create_test_super_admin():
success = False
# 3. 测试API端点
if not test_api_endpoints():
success = False
# 4. 检查前端模板
if not check_templates():
success = False
# 总结
print("\n" + "=" * 60)
if success:
print("🎉 系统修复完成!")
print("=" * 60)
print("\n✅ 可以启动应用了:")
print(" python run.py")
print("\n📖 如有问题请查看:")
print(" - REFACTOR_NOTES.md (重构说明)")
print(" - DEPLOYMENT_GUIDE.md (部署指南)")
return 0
else:
print("❌ 系统修复未完成,请查看错误信息")
print("=" * 60)
return 1
if __name__ == '__main__':
sys.exit(main())

47
debug_database_config.py Normal file
View File

@ -0,0 +1,47 @@
import os
import sys
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 在导入应用之前加载环境变量
try:
from dotenv import load_dotenv
if load_dotenv():
print("成功加载.env文件")
else:
print("未找到或无法加载.env文件")
except ImportError:
print("python-dotenv未安装跳过.env文件加载")
print("Environment variables:")
print(f"DATABASE_URL: {os.environ.get('DATABASE_URL', 'Not set')}")
from app import create_app, db
# 创建应用实例
app = create_app()
print("\nApp config:")
print(f"SQLALCHEMY_DATABASE_URI: {app.config.get('SQLALCHEMY_DATABASE_URI')}")
with app.app_context():
print("\nDatabase engine info:")
print(f"Engine URL: {db.engine.url}")
try:
# 测试数据库连接
connection = db.engine.connect()
print("✓ Database connection successful")
connection.close()
# 检查表结构
from sqlalchemy import inspect
inspector = inspect(db.engine)
tables = inspector.get_table_names()
print(f"Database tables: {tables}")
except Exception as e:
print(f"✗ Database connection failed: {e}")
import traceback
traceback.print_exc()

View File

@ -46,6 +46,9 @@ def init_sqlite_database():
print("\n2. 创建数据库表结构...")
db.create_all()
print(" ✓ 已创建所有数据表")
# 显式提交事务以确保表被创建
db.session.commit()
# 2.1 验证表创建
print("\n2.1 验证表创建...")
@ -60,6 +63,10 @@ def init_sqlite_database():
# 3. 插入初始数据
print("\n3. 插入初始数据...")
insert_sqlite_data()
# 显式提交并关闭会话
db.session.commit()
db.session.close()
# 4. 显示数据库信息
print("\n4. SQLite 数据库初始化完成!")
@ -68,7 +75,11 @@ def init_sqlite_database():
# 5. 显示数据库文件位置
db_path = os.path.abspath('kamaxitong.db')
print(f"\n5. 数据库文件位置: {db_path}")
print(f" 文件大小: {os.path.getsize(db_path)} 字节")
# 只有在文件存在时才显示大小
if os.path.exists(db_path):
print(f" 文件大小: {os.path.getsize(db_path)} 字节")
else:
print(" 文件不存在")
except Exception as e:
print(f"\n❌ SQLite 数据库初始化失败: {e}")

52
insert_admin_data.py Normal file
View File

@ -0,0 +1,52 @@
import os
import sys
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 设置数据库URL为SQLite
os.environ['DATABASE_URL'] = 'sqlite:///kamaxitong.db'
from app import create_app, db
from app.models.admin import Admin
from werkzeug.security import generate_password_hash
# 创建应用实例
app = create_app()
with app.app_context():
# 检查是否已有管理员数据
admin_count = Admin.query.count()
if admin_count > 0:
print(f"Database already contains {admin_count} admin users. Skipping data insertion.")
sys.exit(0)
# 创建默认管理员账号
print("Creating default admin users...")
# 超级管理员
admin = Admin(
username='admin',
email='admin@kamaxitong.com',
role=1, # 超级管理员
status=1 # 正常
)
admin.set_password('admin123')
db.session.add(admin)
# 普通管理员
test_admin = Admin(
username='test_admin',
email='test@kamaxitong.com',
role=0, # 普通管理员
status=1 # 正常
)
test_admin.set_password('test123')
db.session.add(test_admin)
# 提交更改
db.session.commit()
print("✓ Created admin users:")
print(" - Super admin: admin / admin123")
print(" - Normal admin: test_admin / test123")

Binary file not shown.

View File

11
run.py
View File

@ -10,8 +10,15 @@ import sys
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 注释掉强制设置数据库URL为SQLite的代码让应用从.env文件读取配置
# os.environ['DATABASE_URL'] = 'sqlite:///kamaxitong.db'
# 尝试加载.env文件
try:
from dotenv import load_dotenv
if load_dotenv():
print("成功加载.env文件")
else:
print("未找到或无法加载.env文件")
except ImportError:
print("python-dotenv未安装跳过.env文件加载")
from app import create_app

24
run_migrations.py Normal file
View File

@ -0,0 +1,24 @@
import os
import sys
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 设置数据库URL为SQLite
os.environ['DATABASE_URL'] = 'sqlite:///kamaxitong.db'
from app import create_app, db
from flask_migrate import upgrade
# 创建应用实例
app = create_app()
with app.app_context():
print("Running database migrations...")
try:
upgrade()
print("Migrations completed successfully!")
except Exception as e:
print(f"Migration failed: {e}")
import traceback
traceback.print_exc()

343
setup_mysql.py Normal file
View File

@ -0,0 +1,343 @@
#!/usr/bin/env python3
"""
MySQL数据库快速配置脚本
此脚本将帮助您
1. 检查MySQL配置
2. 创建数据库如果不存在
3. 测试数据库连接
4. 初始化表结构
5. 创建默认超级管理员如果需要
"""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app import create_app, db
from app.models import Admin
from werkzeug.security import generate_password_hash
def parse_database_url(url):
"""解析数据库URL"""
# 格式: mysql+pymysql://user:password@host:port/database
try:
# 移除mysql+pymysql://前缀
url = url.replace('mysql+pymysql://', '')
# 找到@符号前的用户名密码
at_index = url.index('@')
user_pass = url[:at_index]
url = url[at_index + 1:]
# 分割用户名和密码
user, password = user_pass.split(':', 1)
# 找到数据库名
if '/' in url:
host_port, database = url.split('/', 1)
# 提取端口
if ':' in host_port:
host, port = host_port.split(':', 1)
else:
host = host_port
port = '3306'
else:
raise ValueError("URL格式不正确")
return {
'user': user,
'password': password,
'host': host,
'port': port,
'database': database
}
except Exception as e:
print(f"❌ 无法解析数据库URL: {e}")
return None
def check_mysql_dependencies():
"""检查MySQL依赖"""
print("\n" + "=" * 60)
print("1. 检查MySQL依赖")
print("=" * 60)
try:
import pymysql
print(f"✅ PyMySQL已安装 (版本: {pymysql.__version__})")
return True
except ImportError:
print("❌ PyMySQL未安装")
print("请运行: pip install PyMySQL")
return False
def check_dotenv():
"""检查.env文件加载"""
print("\n" + "=" * 60)
print("2. 检查.env文件")
print("=" * 60)
env_file = '.env'
if os.path.exists(env_file):
print(f"✅ 找到.env文件: {os.path.abspath(env_file)}")
# 读取DATABASE_URL
with open(env_file, 'r', encoding='utf-8') as f:
for line in f:
if line.startswith('DATABASE_URL='):
url = line.split('=', 1)[1].strip()
print(f"✅ DATABASE_URL已配置: {url}")
return url
print("⚠️ .env文件中未找到DATABASE_URL")
return None
else:
print(f"❌ 未找到.env文件: {os.path.abspath(env_file)}")
return None
def test_database_connection():
"""测试数据库连接"""
print("\n" + "=" * 60)
print("3. 测试数据库连接")
print("=" * 60)
app = create_app()
with app.app_context():
try:
# 测试连接
db.create_all()
print("✅ 数据库连接成功!")
return True
except Exception as e:
print(f"❌ 数据库连接失败: {e}")
return False
def create_database():
"""创建数据库(如果不存在)"""
print("\n" + "=" * 60)
print("4. 创建数据库")
print("=" * 60)
# 从.env文件获取数据库配置
env_file = '.env'
if not os.path.exists(env_file):
print("❌ .env文件不存在")
return False
with open(env_file, 'r', encoding='utf-8') as f:
for line in f:
if line.startswith('DATABASE_URL='):
db_url = line.split('=', 1)[1].strip()
break
if not db_url:
print("❌ 未找到DATABASE_URL配置")
return False
# 解析URL
db_config = parse_database_url(db_url)
if not db_config:
return False
print(f"数据库信息:")
print(f" 主机: {db_config['host']}")
print(f" 端口: {db_config['port']}")
print(f" 数据库: {db_config['database']}")
print(f" 用户: {db_config['user']}")
try:
import pymysql
# 连接到MySQL服务器不指定数据库
connection = pymysql.connect(
host=db_config['host'],
port=int(db_config['port']),
user=db_config['user'],
password=db_config['password'],
charset='utf8mb4'
)
with connection.cursor() as cursor:
# 检查数据库是否存在
cursor.execute(f"SHOW DATABASES LIKE '{db_config['database']}'")
result = cursor.fetchone()
if not result:
print(f"📦 正在创建数据库 '{db_config['database']}'...")
cursor.execute(
f"CREATE DATABASE {db_config['database']} "
"CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci"
)
print(f"✅ 数据库 '{db_config['database']}' 创建成功")
else:
print(f"✅ 数据库 '{db_config['database']}' 已存在")
connection.close()
return True
except ImportError:
print("❌ PyMySQL未安装请运行: pip install PyMySQL")
return False
except Exception as e:
print(f"❌ 创建数据库失败: {e}")
print("请手动创建数据库或检查MySQL服务是否运行")
return False
def init_database():
"""初始化数据库表"""
print("\n" + "=" * 60)
print("5. 初始化数据库表")
print("=" * 60)
app = create_app()
with app.app_context():
try:
# 检查admin表
admin_columns = [c.name for c in Admin.__table__.columns]
print(f"📋 Admin表字段: {', '.join(admin_columns)}")
# 检查新字段
if 'is_deleted' not in admin_columns:
print("⚠️ 缺少软删除字段,正在添加...")
db.session.execute("ALTER TABLE admin ADD COLUMN is_deleted INT NOT NULL DEFAULT 0")
db.session.execute("ALTER TABLE admin ADD COLUMN delete_time DATETIME NULL")
db.session.execute("CREATE INDEX ix_admin_is_deleted ON admin(is_deleted)")
db.session.commit()
print("✅ 软删除字段添加成功")
# 检查audit_log表
try:
from app.models import AuditLog
AuditLog.query.count()
print("✅ audit_log表存在")
except Exception as e:
print(f"⚠️ audit_log表不存在正在创建...")
db.session.execute("""
CREATE TABLE audit_log (
log_id INT AUTO_INCREMENT PRIMARY KEY,
admin_id INT NOT NULL,
action VARCHAR(32) NOT NULL,
target_type VARCHAR(32) NOT NULL,
target_id INT NULL,
details TEXT NULL,
ip_address VARCHAR(32) NULL,
user_agent VARCHAR(256) NULL,
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (admin_id) REFERENCES admin(admin_id)
)
""")
db.session.execute("CREATE INDEX ix_audit_log_admin_id ON audit_log(admin_id)")
db.session.execute("CREATE INDEX ix_audit_log_action ON audit_log(action)")
db.session.execute("CREATE INDEX ix_audit_log_create_time ON audit_log(create_time)")
db.session.commit()
print("✅ audit_log表创建成功")
print("✅ 数据库表初始化成功")
return True
except Exception as e:
db.session.rollback()
print(f"❌ 数据库表初始化失败: {e}")
import traceback
traceback.print_exc()
return False
def create_default_admin():
"""创建默认超级管理员(如果需要)"""
print("\n" + "=" * 60)
print("6. 创建默认超级管理员")
print("=" * 60)
app = create_app()
with app.app_context():
try:
# 检查是否已有超级管理员
super_admin = Admin.query.filter_by(role=1, status=1, is_deleted=0).first()
if super_admin:
print(f"✅ 已存在超级管理员: {super_admin.username}")
return True
print("📝 没有超级管理员,创建默认账号...")
print(" 用户名: admin")
print(" 密码: admin123456")
print(" ⚠️ 登录后请立即修改密码!")
admin = Admin(
username='admin',
email='admin@example.com',
role=1,
status=1
)
admin.set_password('admin123456')
db.session.add(admin)
db.session.commit()
print("✅ 默认超级管理员创建成功")
return True
except Exception as e:
db.session.rollback()
print(f"❌ 创建超级管理员失败: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""主函数"""
print("\n" + "=" * 60)
print("🚀 MySQL数据库快速配置工具")
print("=" * 60)
success = True
# 1. 检查依赖
if not check_mysql_dependencies():
print("\n⚠️ 请先安装PyMySQL: pip install PyMySQL")
return 1
# 2. 检查.env文件
db_url = check_dotenv()
if not db_url:
print("\n⚠️ 请配置.env文件中的DATABASE_URL")
return 1
# 3. 创建数据库
if not create_database():
success = False
# 4. 测试连接
if success and not test_database_connection():
success = False
# 5. 初始化表
if success and not init_database():
success = False
# 6. 创建默认管理员
if success and not create_default_admin():
success = False
# 总结
print("\n" + "=" * 60)
if success:
print("🎉 MySQL配置完成")
print("=" * 60)
print("\n✅ 现在可以启动应用:")
print(" python run.py")
print("\n📖 默认管理员登录信息:")
print(" 用户名: admin")
print(" 密码: admin123456")
print(" ⚠️ 登录后请立即修改密码!")
return 0
else:
print("❌ 配置未完成,请查看错误信息")
print("=" * 60)
return 1
if __name__ == '__main__':
sys.exit(main())

36
simple_init_db.py Normal file
View File

@ -0,0 +1,36 @@
import os
import sys
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 设置数据库URL为SQLite
os.environ['DATABASE_URL'] = 'sqlite:///kamaxitong.db'
from app import create_app, db
from app.models import Admin, Product, Version, License, Device, Ticket
# 创建应用实例
app = create_app()
with app.app_context():
print("Dropping all tables...")
db.drop_all()
print("Creating all tables...")
db.create_all()
print("Committing changes...")
db.session.commit()
# 验证表创建
from sqlalchemy import inspect
inspector = inspect(db.engine)
tables = inspector.get_table_names()
print(f"Created tables: {tables}")
# 显式关闭数据库连接以确保数据写入磁盘
db.session.close()
db.engine.dispose()
print("Database initialization completed!")

19
test_dotenv.py Normal file
View File

@ -0,0 +1,19 @@
import os
from dotenv import load_dotenv
print("Before loading .env:")
print(f"DATABASE_URL: {os.environ.get('DATABASE_URL', 'Not set')}")
# 加载.env文件
result = load_dotenv()
print(f"load_dotenv() result: {result}")
print("After loading .env:")
print(f"DATABASE_URL: {os.environ.get('DATABASE_URL', 'Not set')}")
# 检查当前目录
print(f"Current directory: {os.getcwd()}")
print("Files in current directory:")
for file in os.listdir('.'):
if file.startswith('.env'):
print(f" {file}")

84
update_mysql_schema.py Normal file
View File

@ -0,0 +1,84 @@
import os
import sys
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 在导入应用之前加载环境变量
try:
from dotenv import load_dotenv
if load_dotenv():
print("成功加载.env文件")
except ImportError:
print("python-dotenv未安装跳过.env文件加载")
from app import create_app, db
from sqlalchemy import text
# 创建应用实例
app = create_app()
with app.app_context():
try:
# 检查是否需要添加 is_deleted 字段
result = db.session.execute(text("SHOW COLUMNS FROM admin LIKE 'is_deleted'"))
if result.fetchone() is None:
print("Adding is_deleted column to admin table...")
db.session.execute(text("ALTER TABLE admin ADD COLUMN is_deleted INTEGER NOT NULL DEFAULT 0"))
print("✓ Added is_deleted column")
else:
print("is_deleted column already exists")
# 检查是否需要添加 delete_time 字段
result = db.session.execute(text("SHOW COLUMNS FROM admin LIKE 'delete_time'"))
if result.fetchone() is None:
print("Adding delete_time column to admin table...")
db.session.execute(text("ALTER TABLE admin ADD COLUMN delete_time DATETIME NULL"))
print("✓ Added delete_time column")
else:
print("delete_time column already exists")
# 检查是否需要添加 phone 字段(如果不存在)
result = db.session.execute(text("SHOW COLUMNS FROM admin LIKE 'phone'"))
if result.fetchone() is None:
print("Adding phone column to admin table...")
db.session.execute(text("ALTER TABLE admin ADD COLUMN phone VARCHAR(16) NULL"))
print("✓ Added phone column")
else:
print("phone column already exists")
# 创建 audit_log 表(如果不存在)
result = db.session.execute(text("SHOW TABLES LIKE 'audit_log'"))
if result.fetchone() is None:
print("Creating audit_log table...")
db.session.execute(text("""
CREATE TABLE audit_log (
log_id INTEGER NOT NULL AUTO_INCREMENT,
admin_id INTEGER NOT NULL,
action VARCHAR(32) NOT NULL,
target_type VARCHAR(32) NOT NULL,
target_id INTEGER,
details TEXT,
ip_address VARCHAR(32),
user_agent VARCHAR(256),
create_time DATETIME NOT NULL,
PRIMARY KEY (log_id),
FOREIGN KEY (admin_id) REFERENCES admin (admin_id)
)
"""))
# 创建索引
db.session.execute(text("CREATE INDEX ix_audit_log_admin_id ON audit_log (admin_id)"))
db.session.execute(text("CREATE INDEX ix_audit_log_action ON audit_log (action)"))
db.session.execute(text("CREATE INDEX ix_audit_log_create_time ON audit_log (create_time)"))
print("✓ Created audit_log table")
else:
print("audit_log table already exists")
# 提交所有更改
db.session.commit()
print("\n✓ Database schema update completed!")
except Exception as e:
print(f"Database schema update failed: {e}")
import traceback
traceback.print_exc()

223
verify_mysql_config.py Normal file
View File

@ -0,0 +1,223 @@
#!/usr/bin/env python3
"""
验证MySQL配置脚本
检查
1. .env文件是否存在
2. DATABASE_URL配置是否正确
3. MySQL连接是否正常
4. 数据库表结构是否正确
"""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
def check_env_file():
"""检查.env文件"""
print("\n" + "=" * 60)
print("检查.env文件")
print("=" * 60)
env_file = '.env'
if not os.path.exists(env_file):
print(f"{env_file} 文件不存在")
print(f" 请确保在项目根目录创建 {env_file} 文件")
return None
print(f"{env_file} 文件存在")
# 读取DATABASE_URL
db_url = None
with open(env_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line.startswith('DATABASE_URL='):
db_url = line.split('=', 1)[1].strip()
print(f"✅ DATABASE_URL已配置")
print(f" {db_url}")
break
if not db_url:
print("❌ 未找到DATABASE_URL配置")
print(" 请在.env文件中添加: DATABASE_URL=mysql+pymysql://...")
return db_url
def check_dependencies():
"""检查依赖"""
print("\n" + "=" * 60)
print("检查依赖")
print("=" * 60)
missing = []
try:
import pymysql
print(f"✅ PyMySQL (版本: {pymysql.__version__})")
except ImportError:
print("❌ PyMySQL 未安装")
missing.append("PyMySQL")
try:
import dotenv
print(f"✅ python-dotenv (版本: {dotenv.__version__})")
except ImportError:
print("❌ python-dotenv 未安装")
missing.append("python-dotenv")
if missing:
print(f"\n⚠️ 请安装缺失的依赖:")
print(f" pip install {' '.join(missing)}")
return False
return True
def test_database():
"""测试数据库"""
print("\n" + "=" * 60)
print("测试数据库连接")
print("=" * 60)
try:
from app import create_app, db
app = create_app()
with app.app_context():
# 尝试连接
print("🔄 正在连接数据库...")
db.create_all()
print("✅ 数据库连接成功!")
# 显示数据库信息
print(f"\n数据库信息:")
print(f" URI: {app.config['SQLALCHEMY_DATABASE_URI']}")
# 检查表
print(f"\n检查数据库表:")
from sqlalchemy import inspect
inspector = inspect(db.engine)
tables = inspector.get_table_names()
print(f" 数据库表数量: {len(tables)}")
for table in tables:
print(f" - {table}")
return True
except Exception as e:
print(f"❌ 数据库连接失败")
print(f" 错误: {e}")
print(f"\n请检查:")
print(f" 1. MySQL服务是否运行")
print(f" 2. .env文件中的DATABASE_URL是否正确")
print(f" 3. 用户名和密码是否正确")
print(f" 4. 数据库是否存在")
return False
def check_admin_model():
"""检查Admin模型"""
print("\n" + "=" * 60)
print("检查Admin模型")
print("=" * 60)
try:
from app import create_app
from app.models import Admin
app = create_app()
with app.app_context():
# 检查字段
columns = [c.name for c in Admin.__table__.columns]
print(f"Admin表字段 ({len(columns)}个):")
for col in sorted(columns):
print(f" - {col}")
# 检查新字段
missing = []
if 'is_deleted' not in columns:
missing.append('is_deleted')
if 'delete_time' not in columns:
missing.append('delete_time')
if missing:
print(f"\n⚠️ 缺少字段: {', '.join(missing)}")
print(f" 请运行: python setup_mysql.py")
return False
print("\n✅ Admin模型结构正确")
return True
except Exception as e:
print(f"❌ 检查Admin模型失败: {e}")
return False
def show_config_example():
"""显示配置示例"""
print("\n" + "=" * 60)
print("配置示例")
print("=" * 60)
print("\n.env文件示例:")
print("-" * 60)
print("""# 环境配置
FLASK_ENV=development
FLASK_DEBUG=True
# 数据库配置 - MySQL
DATABASE_URL=mysql+pymysql://root:你的密码@localhost/kamaxitong
# 安全配置
SECRET_KEY=taiyi1224
AUTH_SECRET_KEY=taiyi1224
""")
print("-" * 60)
def main():
"""主函数"""
print("\n" + "=" * 60)
print("🔍 MySQL配置验证工具")
print("=" * 60)
success = True
# 1. 检查.env文件
db_url = check_env_file()
if not db_url:
show_config_example()
return 1
# 2. 检查依赖
if not check_dependencies():
success = False
# 3. 测试数据库
if success and not test_database():
success = False
# 4. 检查Admin模型
if success and not check_admin_model():
success = False
# 总结
print("\n" + "=" * 60)
if success:
print("✅ MySQL配置验证通过")
print("=" * 60)
print("\n📖 下一步:")
print(" python run.py")
print("\n🔑 默认管理员:")
print(" 用户名: admin")
print(" 密码: admin123456")
return 0
else:
print("❌ MySQL配置验证失败")
print("=" * 60)
print("\n💡 尝试自动修复:")
print(" python setup_mysql.py")
return 1
if __name__ == '__main__':
sys.exit(main())