Exeprotector/database.py
2025-10-23 18:27:33 +08:00

630 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
作者:太一
微信taiyi1224
邮箱shoubo1224@qq.com
"""
from typing import Tuple, List, Dict, Optional
import mysql.connector
from mysql.connector import Error
import uuid
from datetime import datetime, timedelta
import hashlib
import os
# 导入文件管理器
try:
from file_manager import FileManager
except ImportError:
FileManager = None
class LicenseDatabase:
def __init__(self, host=None, database=None, user=None, password=None):
self.host = host or os.environ.get('DB_HOST', 'taiyiagi.xyz')
self.database = database or os.environ.get('DB_NAME', 'filesend_db')
self.user = user or os.environ.get('DB_USER', 'taiyi')
self.password = password or os.environ.get('DB_PASSWORD', 'taiyi1224')
self.connection = None
# 初始化文件管理器
if FileManager:
self.file_manager = FileManager()
else:
self.file_manager = None
def connect(self):
"""连接到数据库"""
try:
self.connection = mysql.connector.connect(
host=self.host,
database=self.database,
user=self.user,
password=self.password
)
if self.connection.is_connected():
return True
return False
except Error as e:
print(f"数据库连接错误: {e}")
return False
def create_tables(self):
"""创建必要的数据库表"""
if not self.connection or not self.connection.is_connected():
if not self.connect():
return False
try:
cursor = self.connection.cursor()
# 创建软件产品表
cursor.execute('''
CREATE TABLE IF NOT EXISTS software_products (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL UNIQUE,
description TEXT,
version VARCHAR(50),
exe_path VARCHAR(500),
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
# 检查并添加缺失的字段
try:
# 检查version字段是否存在
cursor.execute("SHOW COLUMNS FROM software_products LIKE 'version'")
if not cursor.fetchone():
cursor.execute("ALTER TABLE software_products ADD COLUMN version VARCHAR(50) AFTER description")
print("成功添加version字段")
except Error as e:
print(f"检查version字段时出错: {e}")
try:
# 检查exe_path字段是否存在
cursor.execute("SHOW COLUMNS FROM software_products LIKE 'exe_path'")
if not cursor.fetchone():
cursor.execute("ALTER TABLE software_products ADD COLUMN exe_path VARCHAR(500) AFTER version")
print("成功添加exe_path字段")
except Error as e:
print(f"检查exe_path字段时出错: {e}")
# 创建卡密表
cursor.execute('''
CREATE TABLE IF NOT EXISTS license_keys (
id INT AUTO_INCREMENT PRIMARY KEY,
key_code VARCHAR(50) NOT NULL UNIQUE,
software_id INT NOT NULL,
machine_code VARCHAR(100) DEFAULT NULL,
start_time DATETIME DEFAULT NULL,
end_time DATETIME NOT NULL,
status ENUM('unused', 'active', 'expired', 'banned') DEFAULT 'unused',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (software_id) REFERENCES software_products(id)
)
''')
self.connection.commit()
cursor.close()
return True
except Error as e:
print(f"创建表错误: {e}")
return False
def generate_key(self, days_valid, software_id):
"""生成一个新的卡密并保存到数据库"""
if not self.connection or not self.connection.is_connected():
if not self.connect():
return None
try:
# 生成UUID作为基础然后进行哈希处理
key_uuid = uuid.uuid4().hex
hash_obj = hashlib.sha256(key_uuid.encode())
# 取前20个字符作为卡密
key_code = hash_obj.hexdigest()[:20].upper()
# 格式化卡密每5个字符一组
formatted_key = '-'.join([key_code[i:i + 5] for i in range(0, len(key_code), 5)])
# 计算过期时间
end_time = datetime.now() + timedelta(days=days_valid)
cursor = self.connection.cursor()
query = """
INSERT INTO license_keys (key_code, software_id, end_time)
VALUES (%s, %s, %s)
"""
cursor.execute(query, (formatted_key, software_id, end_time))
self.connection.commit()
cursor.close()
return formatted_key
except Error as e:
print(f"生成卡密错误: {e}")
return None
def validate_key(self, key_code, machine_code, software_id):
"""验证卡密是否有效,并绑定机器码 - 严格一机一码"""
if not self.connection or not self.connection.is_connected():
if not self.connect():
return False, "数据库连接失败"
try:
cursor = self.connection.cursor(dictionary=True)
# 查询卡密信息
query = "SELECT * FROM license_keys WHERE key_code = %s AND software_id = %s"
cursor.execute(query, (key_code, software_id))
key_info = cursor.fetchone()
if not key_info:
cursor.close()
return False, "无效的激活码"
# 检查卡密状态
if key_info['status'] == 'banned':
cursor.close()
return False, "激活码已被封禁"
if key_info['status'] == 'expired':
cursor.close()
return False, "激活码已过期"
# 一机一码严格检查:每个激活码只能在一台机器上使用
if key_info['status'] == 'active':
if key_info['machine_code'] != machine_code:
# 这个码已经用过了,不能再次使用
cursor.close()
return False, f"此激活码已在设备{key_info['machine_code'][:8]}...上使用,一个激活码只能在一台设备上使用一次"
else:
# 已经激活过这台机器,验证是否过期
if datetime.now() > key_info['end_time']:
update_query = "UPDATE license_keys SET status = 'expired' WHERE key_code = %s"
cursor.execute(update_query, (key_code,))
self.connection.commit()
cursor.close()
return False, "激活码已过期"
else:
cursor.close()
return True, "此设备已激活,继续使用"
# 首次激活:验证通过后绑定到机器
if key_info['status'] == 'unused':
update_query = """
UPDATE license_keys
SET status = 'active', machine_code = %s, start_time = %s
WHERE key_code = %s AND status = 'unused'
"""
cursor.execute(update_query, (machine_code, datetime.now(), key_code))
rows_affected = cursor.rowcount
self.connection.commit()
if rows_affected == 0:
# 可能已经被其他并发操作激活
cursor.close()
return False, "激活码已被使用,请使用新的激活码"
# 再次检查是否过期(防止并发问题)
final_check_query = "SELECT * FROM license_keys WHERE key_code = %s"
cursor.execute(final_check_query, (key_code,))
final_info = cursor.fetchone()
if final_info and datetime.now() > final_info['end_time']:
update_query = "UPDATE license_keys SET status = 'expired' WHERE key_code = %s"
cursor.execute(update_query, (key_code,))
self.connection.commit()
cursor.close()
return False, "激活码已过期"
cursor.close()
return True, "激活成功"
except Error as e:
print(f"验证激活码错误: {e}")
return False, f"验证过程出错: {str(e)}"
def get_all_keys(self, software_id=None):
"""获取所有卡密信息可按软件ID筛选"""
if not self.connection or not self.connection.is_connected():
print("警告: 数据库未连接,尝试重新连接...")
if not self.connect():
print("错误: 数据库重连失败")
return []
cursor = None
try:
cursor = self.connection.cursor(dictionary=True)
# 先检查license_keys表是否存在
try:
cursor.execute("SHOW TABLES LIKE 'license_keys'")
if not cursor.fetchone():
print("警告: license_keys表不存在请先创建数据库表")
if cursor:
cursor.close()
return []
except Error as e:
print(f"检查表存在性时出错: {e}")
if cursor:
cursor.close()
return []
# 检查software_products表是否存在以及字段是否完整
use_join = False
try:
cursor.execute("SHOW TABLES LIKE 'software_products'")
if cursor.fetchone():
# 表存在,检查字段
cursor.execute("SHOW COLUMNS FROM software_products")
columns = [row['Field'] for row in cursor.fetchall()]
if 'name' in columns: # 如果有name字段就使用JOIN查询
use_join = True
except Error as e:
print(f"检查software_products表时出错: {e}")
use_join = False
# 构建并执行查询
try:
if use_join:
if software_id:
query = """
SELECT lk.*, COALESCE(sp.name, 'Unknown') as software_name
FROM license_keys lk
LEFT JOIN software_products sp ON lk.software_id = sp.id
WHERE lk.software_id = %s
ORDER BY lk.created_at DESC
"""
cursor.execute(query, (software_id,))
else:
query = """
SELECT lk.*, COALESCE(sp.name, 'Unknown') as software_name
FROM license_keys lk
LEFT JOIN software_products sp ON lk.software_id = sp.id
ORDER BY lk.created_at DESC
"""
cursor.execute(query)
else:
# 不使用JOIN只查询license_keys表
if software_id:
query = "SELECT *, 'Unknown' as software_name FROM license_keys WHERE software_id = %s ORDER BY created_at DESC"
cursor.execute(query, (software_id,))
else:
query = "SELECT *, 'Unknown' as software_name FROM license_keys ORDER BY created_at DESC"
cursor.execute(query)
keys = cursor.fetchall()
# 确保返回的是列表
if keys is None:
keys = []
if len(keys) == 0:
print("提示: 数据库中暂无卡密记录")
else:
print(f"成功从数据库获取 {len(keys)} 个卡密记录")
if cursor:
cursor.close()
return keys
except Error as e:
print(f"执行查询时出错: {e}")
if cursor:
cursor.close()
return []
except Error as e:
print(f"获取卡密列表错误: {type(e).__name__}: {e}")
if cursor:
cursor.close()
return []
except Exception as e:
print(f"获取卡密列表时发生未知错误: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
if cursor:
cursor.close()
return []
def update_key_status(self, key_code, status):
"""更新卡密状态"""
if not self.connection or not self.connection.is_connected():
if not self.connect():
return False
try:
cursor = self.connection.cursor()
query = "UPDATE license_keys SET status = %s WHERE key_code = %s"
cursor.execute(query, (status, key_code))
self.connection.commit()
cursor.close()
return True
except Error as e:
print(f"更新卡密状态错误: {e}")
return False
def release_key(self, key_code):
"""释放已使用的激活码 - 将其重置为未使用状态,清空机器码"""
if not self.connection or not self.connection.is_connected():
if not self.connect():
return False, "数据库连接失败"
try:
cursor = self.connection.cursor(dictionary=True)
# 检查卡密是否存在且处于已激活状态
check_query = "SELECT * FROM license_keys WHERE key_code = %s"
cursor.execute(check_query, (key_code,))
key_info = cursor.fetchone()
if not key_info:
cursor.close()
return False, "激活码不存在"
if key_info['status'] != 'active':
cursor.close()
return False, f"激活码处于 {key_info['status']} 状态,只能释放已使用的激活码"
# 释放激活码:重置为未使用状态,清空机器码和开始时间
release_query = """
UPDATE license_keys
SET status = 'unused', machine_code = NULL, start_time = NULL
WHERE key_code = %s
"""
cursor.execute(release_query, (key_code,))
rows_affected = cursor.rowcount
self.connection.commit()
cursor.close()
if rows_affected > 0:
return True, "激活码已释放,可以重新使用"
else:
return False, "释放激活码失败"
except Error as e:
print(f"释放激活码错误: {e}")
return False, f"释放过程出错: {str(e)}"
def close(self):
"""关闭数据库连接"""
if self.connection and self.connection.is_connected():
self.connection.close()
def unbind_key(self, key_code: str) -> Tuple[bool, str]:
"""强制解除卡密与机器码的绑定"""
if not self.connection:
return False, "数据库未连接"
try:
cursor = self.connection.cursor(dictionary=True)
cursor.execute(
"UPDATE license_keys SET status='unused', machine_code=NULL, start_time=NULL WHERE key_code=%s",
(key_code,))
self.connection.commit()
rows = cursor.rowcount
cursor.close()
return (True, "已解绑并释放") if rows else (False, "卡密不存在或状态异常")
except Exception as e:
return False, str(e)
def add_software_product_with_file(self, name: str, description: str = "", version: str = "",
exe_source_path: str = "") -> Tuple[bool, str]:
"""添加软件产品并上传exe文件"""
if not self.connection or not self.connection.is_connected():
if not self.connect():
return False, "数据库连接失败"
try:
# 如果提供了exe文件路径尝试上传
local_exe_path = ""
if exe_source_path and self.file_manager:
success, msg, relative_path = self.file_manager.upload_exe_file(
exe_source_path, name
)
if success:
local_exe_path = relative_path
else:
return False, f"文件上传失败: {msg}"
elif exe_source_path:
# 没有文件管理器,使用原来的逻辑
local_exe_path = exe_source_path
cursor = self.connection.cursor()
query = """
INSERT INTO software_products (name, description, version, exe_path)
VALUES (%s, %s, %s, %s)
"""
cursor.execute(query, (name, description, version, local_exe_path))
self.connection.commit()
cursor.close()
if exe_source_path and self.file_manager:
return True, f"软件产品添加成功!\n\n✅ 文件已自动保存到本地目录\n目录: {self.file_manager.exe_dir}\n文件名: {local_exe_path}\n\n现在你的EXE文件已经被安全地存储在程序专属目录中不用担心文件移动问题了"
else:
return True, "软件产品添加成功"
except Error as e:
if "Duplicate entry" in str(e):
return False, "软件产品已存在"
return False, f"添加软件产品失败: {str(e)}"
def get_exe_full_path(self, relative_path: str) -> str:
"""获取exe文件的完整路径"""
if not relative_path:
return ""
# 如果是绝对路径,直接返回
if os.path.isabs(relative_path):
return relative_path
# 如果有文件管理器,使用它获取路径
if self.file_manager:
return self.file_manager.get_exe_full_path(relative_path)
else:
# 退回到相对路径逻辑
base_dir = os.path.dirname(__file__)
return os.path.join(base_dir, "files", "executables", relative_path)
def delete_software_product_with_file(self, product_id: int) -> Tuple[bool, str]:
"""删除软件产品并删除关联的exe文件"""
if not self.connection or not self.connection.is_connected():
if not self.connect():
return False, "数据库连接失败"
try:
cursor = self.connection.cursor(dictionary=True)
# 获取软件信息
cursor.execute("SELECT * FROM software_products WHERE id = %s", (product_id,))
product = cursor.fetchone()
if not product:
cursor.close()
return False, "软件产品不存在"
# 检查是否有关联的卡密
cursor.execute("SELECT COUNT(*) FROM license_keys WHERE software_id = %s", (product_id,))
key_count = cursor.fetchone()['COUNT(*)']
if key_count > 0:
cursor.close()
return False, f"无法删除,还有 {key_count} 个关联的卡密"
# 删除数据库记录
cursor.execute("DELETE FROM software_products WHERE id = %s", (product_id,))
self.connection.commit()
cursor.close()
# 删除关联的exe文件
exe_path = product.get('exe_path', '')
if exe_path and self.file_manager:
# 如果是相对路径,删除文件
if not os.path.isabs(exe_path):
file_success, file_msg = self.file_manager.delete_exe_file(exe_path)
if not file_success:
return True, f"软件产品删除成功,但文件删除失败: {file_msg}"
else:
return True, f"软件产品和文件删除成功: {file_msg}"
return True, "软件产品删除成功"
except Error as e:
return False, f"删除软件产品失败: {str(e)}"
def get_file_storage_info(self) -> Dict:
"""获取文件存储信息"""
if self.file_manager:
return self.file_manager.get_storage_info()
else:
return {
'total_files': 0,
'total_size': 0,
'existing_files': 0,
'missing_files': 0,
'base_dir': '文件管理器未初始化',
'exe_dir': '',
'backup_dir': ''
}
def get_software_products(self) -> List[Dict]:
"""获取所有软件产品列表"""
if not self.connection or not self.connection.is_connected():
if not self.connect():
return []
try:
cursor = self.connection.cursor(dictionary=True)
query = "SELECT * FROM software_products ORDER BY name"
cursor.execute(query)
products = cursor.fetchall()
cursor.close()
return products
except Error as e:
print(f"获取软件产品列表错误: {e}")
return []
def get_software_by_name(self, name: str) -> Optional[Dict]:
"""根据名称获取软件产品信息"""
if not self.connection or not self.connection.is_connected():
if not self.connect():
return None
try:
cursor = self.connection.cursor(dictionary=True)
query = "SELECT * FROM software_products WHERE name = %s"
cursor.execute(query, (name,))
product = cursor.fetchone()
cursor.close()
return product
except Error as e:
print(f"获取软件产品信息错误: {e}")
return None
def update_software_product(self, product_id: int, name: str = None, description: str = None,
version: str = None, exe_path: str = None) -> Tuple[bool, str]:
"""更新软件产品信息"""
if not self.connection or not self.connection.is_connected():
if not self.connect():
return False, "数据库连接失败"
try:
cursor = self.connection.cursor()
# 构建更新语句
updates = []
params = []
if name is not None:
updates.append("name = %s")
params.append(name)
if description is not None:
updates.append("description = %s")
params.append(description)
if version is not None:
updates.append("version = %s")
params.append(version)
if exe_path is not None:
updates.append("exe_path = %s")
params.append(exe_path)
if not updates:
return False, "没有提供更新内容"
query = f"UPDATE software_products SET {', '.join(updates)} WHERE id = %s"
params.append(product_id)
cursor.execute(query, params)
self.connection.commit()
cursor.close()
return True, "软件产品更新成功"
except Error as e:
if "Duplicate entry" in str(e):
return False, "软件名称已存在"
return False, f"更新软件产品失败: {str(e)}"
def delete_software_product(self, product_id: int) -> Tuple[bool, str]:
"""删除软件产品"""
if not self.connection or not self.connection.is_connected():
if not self.connect():
return False, "数据库连接失败"
try:
cursor = self.connection.cursor()
# 检查是否有关联的卡密
cursor.execute("SELECT COUNT(*) FROM license_keys WHERE software_id = %s", (product_id,))
key_count = cursor.fetchone()[0]
if key_count > 0:
cursor.close()
return False, f"无法删除,还有 {key_count} 个关联的卡密"
cursor.execute("DELETE FROM software_products WHERE id = %s", (product_id,))
self.connection.commit()
cursor.close()
return True, "软件产品删除成功"
except Error as e:
return False, f"删除软件产品失败: {str(e)}"