""" 作者:太一 微信:taiyi1224 邮箱:shoubo1224@qq.com """ from typing import Tuple, List, Dict, Optional import mysql.connector from mysql.connector import Error import uuid from datetime import datetime, timedelta import hashlib import os # 导入文件管理器 try: from file_manager import FileManager except ImportError: FileManager = None class LicenseDatabase: def __init__(self, host=None, database=None, user=None, password=None): self.host = host or os.environ.get('DB_HOST', 'localhost') self.database = database or os.environ.get('DB_NAME', 'exeprotector') self.user = user or os.environ.get('DB_USER', 'root') self.password = password or os.environ.get('DB_PASSWORD', 'taiyi1224') self.connection = None # 初始化文件管理器 if FileManager: self.file_manager = FileManager() else: self.file_manager = None def connect(self): """连接到数据库""" try: self.connection = mysql.connector.connect( host=self.host, database=self.database, user=self.user, password=self.password ) if self.connection.is_connected(): return True return False except Error as e: print(f"数据库连接错误: {e}") return False def create_tables(self): """创建必要的数据库表""" if not self.connection or not self.connection.is_connected(): if not self.connect(): return False try: cursor = self.connection.cursor() # 创建软件产品表 cursor.execute(''' CREATE TABLE IF NOT EXISTS software_products ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(100) NOT NULL UNIQUE, description TEXT, version VARCHAR(50), exe_path VARCHAR(500), created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') # 检查并添加缺失的字段 try: # 检查version字段是否存在 cursor.execute("SHOW COLUMNS FROM software_products LIKE 'version'") if not cursor.fetchone(): cursor.execute("ALTER TABLE software_products ADD COLUMN version VARCHAR(50) AFTER description") print("成功添加version字段") except Error as e: print(f"检查version字段时出错: {e}") try: # 检查exe_path字段是否存在 cursor.execute("SHOW COLUMNS FROM software_products LIKE 'exe_path'") if not cursor.fetchone(): cursor.execute("ALTER TABLE software_products ADD COLUMN exe_path VARCHAR(500) AFTER version") print("成功添加exe_path字段") except Error as e: print(f"检查exe_path字段时出错: {e}") # 创建卡密表 cursor.execute(''' CREATE TABLE IF NOT EXISTS license_keys ( id INT AUTO_INCREMENT PRIMARY KEY, key_code VARCHAR(50) NOT NULL UNIQUE, software_id INT NOT NULL, machine_code VARCHAR(100) DEFAULT NULL, start_time DATETIME DEFAULT NULL, end_time DATETIME NOT NULL, status ENUM('unused', 'active', 'expired', 'banned') DEFAULT 'unused', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (software_id) REFERENCES software_products(id) ) ''') self.connection.commit() cursor.close() return True except Error as e: print(f"创建表错误: {e}") return False def generate_key(self, days_valid, software_id): """生成一个新的卡密并保存到数据库""" if not self.connection or not self.connection.is_connected(): if not self.connect(): return None try: # 生成UUID作为基础,然后进行哈希处理 key_uuid = uuid.uuid4().hex hash_obj = hashlib.sha256(key_uuid.encode()) # 取前20个字符作为卡密 key_code = hash_obj.hexdigest()[:20].upper() # 格式化卡密,每5个字符一组 formatted_key = '-'.join([key_code[i:i + 5] for i in range(0, len(key_code), 5)]) # 计算过期时间 end_time = datetime.now() + timedelta(days=days_valid) cursor = self.connection.cursor() query = """ INSERT INTO license_keys (key_code, software_id, end_time) VALUES (%s, %s, %s) """ cursor.execute(query, (formatted_key, software_id, end_time)) self.connection.commit() cursor.close() return formatted_key except Error as e: print(f"生成卡密错误: {e}") return None def validate_key(self, key_code, machine_code, software_id): """验证卡密是否有效,并绑定机器码 - 严格一机一码""" if not self.connection or not self.connection.is_connected(): if not self.connect(): return False, "数据库连接失败" try: cursor = self.connection.cursor(dictionary=True) # 查询卡密信息 query = "SELECT * FROM license_keys WHERE key_code = %s AND software_id = %s" cursor.execute(query, (key_code, software_id)) key_info = cursor.fetchone() if not key_info: cursor.close() return False, "无效的激活码" # 检查卡密状态 if key_info['status'] == 'banned': cursor.close() return False, "激活码已被封禁" if key_info['status'] == 'expired': cursor.close() return False, "激活码已过期" # 一机一码严格检查:每个激活码只能在一台机器上使用 if key_info['status'] == 'active': if key_info['machine_code'] != machine_code: # 这个码已经用过了,不能再次使用 cursor.close() return False, f"此激活码已在设备{key_info['machine_code'][:8]}...上使用,一个激活码只能在一台设备上使用一次" else: # 已经激活过这台机器,验证是否过期 if datetime.now() > key_info['end_time']: update_query = "UPDATE license_keys SET status = 'expired' WHERE key_code = %s" cursor.execute(update_query, (key_code,)) self.connection.commit() cursor.close() return False, "激活码已过期" else: cursor.close() return True, "此设备已激活,继续使用" # 首次激活:验证通过后绑定到机器 if key_info['status'] == 'unused': update_query = """ UPDATE license_keys SET status = 'active', machine_code = %s, start_time = %s WHERE key_code = %s AND status = 'unused' """ cursor.execute(update_query, (machine_code, datetime.now(), key_code)) rows_affected = cursor.rowcount self.connection.commit() if rows_affected == 0: # 可能已经被其他并发操作激活 cursor.close() return False, "激活码已被使用,请使用新的激活码" # 再次检查是否过期(防止并发问题) final_check_query = "SELECT * FROM license_keys WHERE key_code = %s" cursor.execute(final_check_query, (key_code,)) final_info = cursor.fetchone() if final_info and datetime.now() > final_info['end_time']: update_query = "UPDATE license_keys SET status = 'expired' WHERE key_code = %s" cursor.execute(update_query, (key_code,)) self.connection.commit() cursor.close() return False, "激活码已过期" cursor.close() return True, "激活成功" except Error as e: print(f"验证激活码错误: {e}") return False, f"验证过程出错: {str(e)}" def get_all_keys(self, software_id=None): """获取所有卡密信息,可按软件ID筛选""" if not self.connection or not self.connection.is_connected(): print("警告: 数据库未连接,尝试重新连接...") if not self.connect(): print("错误: 数据库重连失败") return [] cursor = None try: cursor = self.connection.cursor(dictionary=True) # 先检查license_keys表是否存在 try: cursor.execute("SHOW TABLES LIKE 'license_keys'") if not cursor.fetchone(): print("警告: license_keys表不存在,请先创建数据库表") if cursor: cursor.close() return [] except Error as e: print(f"检查表存在性时出错: {e}") if cursor: cursor.close() return [] # 检查software_products表是否存在以及字段是否完整 use_join = False try: cursor.execute("SHOW TABLES LIKE 'software_products'") if cursor.fetchone(): # 表存在,检查字段 cursor.execute("SHOW COLUMNS FROM software_products") columns = [row['Field'] for row in cursor.fetchall()] if 'name' in columns: # 如果有name字段,就使用JOIN查询 use_join = True except Error as e: print(f"检查software_products表时出错: {e}") use_join = False # 构建并执行查询 try: if use_join: if software_id: query = """ SELECT lk.*, COALESCE(sp.name, 'Unknown') as software_name FROM license_keys lk LEFT JOIN software_products sp ON lk.software_id = sp.id WHERE lk.software_id = %s ORDER BY lk.created_at DESC """ cursor.execute(query, (software_id,)) else: query = """ SELECT lk.*, COALESCE(sp.name, 'Unknown') as software_name FROM license_keys lk LEFT JOIN software_products sp ON lk.software_id = sp.id ORDER BY lk.created_at DESC """ cursor.execute(query) else: # 不使用JOIN,只查询license_keys表 if software_id: query = "SELECT *, 'Unknown' as software_name FROM license_keys WHERE software_id = %s ORDER BY created_at DESC" cursor.execute(query, (software_id,)) else: query = "SELECT *, 'Unknown' as software_name FROM license_keys ORDER BY created_at DESC" cursor.execute(query) keys = cursor.fetchall() # 确保返回的是列表 if keys is None: keys = [] if len(keys) == 0: print("提示: 数据库中暂无卡密记录") else: print(f"成功从数据库获取 {len(keys)} 个卡密记录") if cursor: cursor.close() return keys except Error as e: print(f"执行查询时出错: {e}") if cursor: cursor.close() return [] except Error as e: print(f"获取卡密列表错误: {type(e).__name__}: {e}") if cursor: cursor.close() return [] except Exception as e: print(f"获取卡密列表时发生未知错误: {type(e).__name__}: {e}") import traceback traceback.print_exc() if cursor: cursor.close() return [] def update_key_status(self, key_code, status): """更新卡密状态""" if not self.connection or not self.connection.is_connected(): if not self.connect(): return False try: cursor = self.connection.cursor() query = "UPDATE license_keys SET status = %s WHERE key_code = %s" cursor.execute(query, (status, key_code)) self.connection.commit() cursor.close() return True except Error as e: print(f"更新卡密状态错误: {e}") return False def release_key(self, key_code): """释放已使用的激活码 - 将其重置为未使用状态,清空机器码""" if not self.connection or not self.connection.is_connected(): if not self.connect(): return False, "数据库连接失败" try: cursor = self.connection.cursor(dictionary=True) # 检查卡密是否存在且处于已激活状态 check_query = "SELECT * FROM license_keys WHERE key_code = %s" cursor.execute(check_query, (key_code,)) key_info = cursor.fetchone() if not key_info: cursor.close() return False, "激活码不存在" if key_info['status'] != 'active': cursor.close() return False, f"激活码处于 {key_info['status']} 状态,只能释放已使用的激活码" # 释放激活码:重置为未使用状态,清空机器码和开始时间 release_query = """ UPDATE license_keys SET status = 'unused', machine_code = NULL, start_time = NULL WHERE key_code = %s """ cursor.execute(release_query, (key_code,)) rows_affected = cursor.rowcount self.connection.commit() cursor.close() if rows_affected > 0: return True, "激活码已释放,可以重新使用" else: return False, "释放激活码失败" except Error as e: print(f"释放激活码错误: {e}") return False, f"释放过程出错: {str(e)}" def close(self): """关闭数据库连接""" if self.connection and self.connection.is_connected(): self.connection.close() def unbind_key(self, key_code: str) -> Tuple[bool, str]: """强制解除卡密与机器码的绑定""" if not self.connection: return False, "数据库未连接" try: cursor = self.connection.cursor(dictionary=True) cursor.execute( "UPDATE license_keys SET status='unused', machine_code=NULL, start_time=NULL WHERE key_code=%s", (key_code,)) self.connection.commit() rows = cursor.rowcount cursor.close() return (True, "已解绑并释放") if rows else (False, "卡密不存在或状态异常") except Exception as e: return False, str(e) def add_software_product_with_file(self, name: str, description: str = "", version: str = "", exe_source_path: str = "") -> Tuple[bool, str]: """添加软件产品并上传exe文件""" if not self.connection or not self.connection.is_connected(): if not self.connect(): return False, "数据库连接失败" try: # 如果提供了exe文件路径,尝试上传 local_exe_path = "" if exe_source_path and self.file_manager: success, msg, relative_path = self.file_manager.upload_exe_file( exe_source_path, name ) if success: local_exe_path = relative_path else: return False, f"文件上传失败: {msg}" elif exe_source_path: # 没有文件管理器,使用原来的逻辑 local_exe_path = exe_source_path cursor = self.connection.cursor() query = """ INSERT INTO software_products (name, description, version, exe_path) VALUES (%s, %s, %s, %s) """ cursor.execute(query, (name, description, version, local_exe_path)) self.connection.commit() cursor.close() if exe_source_path and self.file_manager: return True, f"软件产品添加成功!\n\n✅ 文件已自动保存到本地目录\n目录: {self.file_manager.exe_dir}\n文件名: {local_exe_path}\n\n现在你的EXE文件已经被安全地存储在程序专属目录中,不用担心文件移动问题了!" else: return True, "软件产品添加成功" except Error as e: if "Duplicate entry" in str(e): return False, "软件产品已存在" return False, f"添加软件产品失败: {str(e)}" def get_exe_full_path(self, relative_path: str) -> str: """获取exe文件的完整路径""" if not relative_path: return "" # 如果是绝对路径,直接返回 if os.path.isabs(relative_path): return relative_path # 如果有文件管理器,使用它获取路径 if self.file_manager: return self.file_manager.get_exe_full_path(relative_path) else: # 退回到相对路径逻辑 base_dir = os.path.dirname(__file__) return os.path.join(base_dir, "files", "executables", relative_path) def delete_software_product_with_file(self, product_id: int) -> Tuple[bool, str]: """删除软件产品并删除关联的exe文件""" if not self.connection or not self.connection.is_connected(): if not self.connect(): return False, "数据库连接失败" try: cursor = self.connection.cursor(dictionary=True) # 获取软件信息 cursor.execute("SELECT * FROM software_products WHERE id = %s", (product_id,)) product = cursor.fetchone() if not product: cursor.close() return False, "软件产品不存在" # 检查是否有关联的卡密 cursor.execute("SELECT COUNT(*) FROM license_keys WHERE software_id = %s", (product_id,)) key_count = cursor.fetchone()['COUNT(*)'] if key_count > 0: cursor.close() return False, f"无法删除,还有 {key_count} 个关联的卡密" # 删除数据库记录 cursor.execute("DELETE FROM software_products WHERE id = %s", (product_id,)) self.connection.commit() cursor.close() # 删除关联的exe文件 exe_path = product.get('exe_path', '') if exe_path and self.file_manager: # 如果是相对路径,删除文件 if not os.path.isabs(exe_path): file_success, file_msg = self.file_manager.delete_exe_file(exe_path) if not file_success: return True, f"软件产品删除成功,但文件删除失败: {file_msg}" else: return True, f"软件产品和文件删除成功: {file_msg}" return True, "软件产品删除成功" except Error as e: return False, f"删除软件产品失败: {str(e)}" def get_file_storage_info(self) -> Dict: """获取文件存储信息""" if self.file_manager: return self.file_manager.get_storage_info() else: return { 'total_files': 0, 'total_size': 0, 'existing_files': 0, 'missing_files': 0, 'base_dir': '文件管理器未初始化', 'exe_dir': '', 'backup_dir': '' } def get_software_products(self) -> List[Dict]: """获取所有软件产品列表""" if not self.connection or not self.connection.is_connected(): if not self.connect(): return [] try: cursor = self.connection.cursor(dictionary=True) query = "SELECT * FROM software_products ORDER BY name" cursor.execute(query) products = cursor.fetchall() cursor.close() return products except Error as e: print(f"获取软件产品列表错误: {e}") return [] def get_software_by_name(self, name: str) -> Optional[Dict]: """根据名称获取软件产品信息""" if not self.connection or not self.connection.is_connected(): if not self.connect(): return None try: cursor = self.connection.cursor(dictionary=True) query = "SELECT * FROM software_products WHERE name = %s" cursor.execute(query, (name,)) product = cursor.fetchone() cursor.close() return product except Error as e: print(f"获取软件产品信息错误: {e}") return None def update_software_product(self, product_id: int, name: str = None, description: str = None, version: str = None, exe_path: str = None) -> Tuple[bool, str]: """更新软件产品信息""" if not self.connection or not self.connection.is_connected(): if not self.connect(): return False, "数据库连接失败" try: cursor = self.connection.cursor() # 构建更新语句 updates = [] params = [] if name is not None: updates.append("name = %s") params.append(name) if description is not None: updates.append("description = %s") params.append(description) if version is not None: updates.append("version = %s") params.append(version) if exe_path is not None: updates.append("exe_path = %s") params.append(exe_path) if not updates: return False, "没有提供更新内容" query = f"UPDATE software_products SET {', '.join(updates)} WHERE id = %s" params.append(product_id) cursor.execute(query, params) self.connection.commit() cursor.close() return True, "软件产品更新成功" except Error as e: if "Duplicate entry" in str(e): return False, "软件名称已存在" return False, f"更新软件产品失败: {str(e)}" def delete_software_product(self, product_id: int) -> Tuple[bool, str]: """删除软件产品""" if not self.connection or not self.connection.is_connected(): if not self.connect(): return False, "数据库连接失败" try: cursor = self.connection.cursor() # 检查是否有关联的卡密 cursor.execute("SELECT COUNT(*) FROM license_keys WHERE software_id = %s", (product_id,)) key_count = cursor.fetchone()[0] if key_count > 0: cursor.close() return False, f"无法删除,还有 {key_count} 个关联的卡密" cursor.execute("DELETE FROM software_products WHERE id = %s", (product_id,)) self.connection.commit() cursor.close() return True, "软件产品删除成功" except Error as e: return False, f"删除软件产品失败: {str(e)}"