import mysql.connector from mysql.connector import Error import uuid from datetime import datetime, timedelta import hashlib class LicenseDatabase: def __init__(self, host, database, user, password): self.host = host self.database = database self.user = user self.password = password self.connection = None def connect(self): """连接到数据库""" try: self.connection = mysql.connector.connect( host=self.host, database=self.database, user=self.user, password=self.password ) if self.connection.is_connected(): return True return False except Error as e: print(f"数据库连接错误: {e}") return False def create_tables(self): """创建必要的数据库表""" if not self.connection or not self.connection.is_connected(): if not self.connect(): return False try: cursor = self.connection.cursor() # 创建卡密表 cursor.execute(''' CREATE TABLE IF NOT EXISTS license_keys ( id INT AUTO_INCREMENT PRIMARY KEY, key_code VARCHAR(50) NOT NULL UNIQUE, machine_code VARCHAR(100) DEFAULT NULL, start_time DATETIME DEFAULT NULL, end_time DATETIME NOT NULL, status ENUM('unused', 'active', 'expired', 'banned') DEFAULT 'unused', created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') self.connection.commit() cursor.close() return True except Error as e: print(f"创建表错误: {e}") return False def generate_key(self, days_valid): """生成一个新的卡密并保存到数据库""" if not self.connection or not self.connection.is_connected(): if not self.connect(): return None try: # 生成UUID作为基础,然后进行哈希处理 key_uuid = uuid.uuid4().hex hash_obj = hashlib.sha256(key_uuid.encode()) # 取前20个字符作为卡密 key_code = hash_obj.hexdigest()[:20].upper() # 格式化卡密,每5个字符一组 formatted_key = '-'.join([key_code[i:i + 5] for i in range(0, len(key_code), 5)]) # 计算过期时间 end_time = datetime.now() + timedelta(days=days_valid) cursor = self.connection.cursor() query = """ INSERT INTO license_keys (key_code, end_time) VALUES (%s, %s) """ cursor.execute(query, (formatted_key, end_time)) self.connection.commit() cursor.close() return formatted_key except Error as e: print(f"生成卡密错误: {e}") return None def validate_key(self, key_code, machine_code): """验证卡密是否有效,并绑定机器码""" if not self.connection or not self.connection.is_connected(): if not self.connect(): return False, "数据库连接失败" try: cursor = self.connection.cursor(dictionary=True) # 查询卡密信息 query = "SELECT * FROM license_keys WHERE key_code = %s" cursor.execute(query, (key_code,)) key_info = cursor.fetchone() if not key_info: cursor.close() return False, "无效的卡密" # 检查卡密状态 if key_info['status'] == 'banned': cursor.close() return False, "卡密已被封禁" if key_info['status'] == 'expired': cursor.close() return False, "卡密已过期" # 检查是否已绑定其他机器 if key_info['status'] == 'active' and key_info['machine_code'] != machine_code: cursor.close() return False, "此卡密已绑定其他设备" # 首次使用,绑定机器码并激活 if key_info['status'] == 'unused': update_query = """ UPDATE license_keys SET status = 'active', machine_code = %s, start_time = %s WHERE key_code = %s """ cursor.execute(update_query, (machine_code, datetime.now(), key_code)) self.connection.commit() # 检查是否过期 if datetime.now() > key_info['end_time']: update_query = "UPDATE license_keys SET status = 'expired' WHERE key_code = %s" cursor.execute(update_query, (key_code,)) self.connection.commit() cursor.close() return False, "卡密已过期" cursor.close() return True, "验证成功" except Error as e: print(f"验证卡密错误: {e}") return False, f"验证过程出错: {str(e)}" def get_all_keys(self): """获取所有卡密信息""" if not self.connection or not self.connection.is_connected(): if not self.connect(): return [] try: cursor = self.connection.cursor(dictionary=True) query = "SELECT * FROM license_keys ORDER BY created_at DESC" cursor.execute(query) keys = cursor.fetchall() cursor.close() return keys except Error as e: print(f"获取卡密列表错误: {e}") return [] def update_key_status(self, key_code, status): """更新卡密状态""" if not self.connection or not self.connection.is_connected(): if not self.connect(): return False try: cursor = self.connection.cursor() query = "UPDATE license_keys SET status = %s WHERE key_code = %s" cursor.execute(query, (status, key_code)) self.connection.commit() cursor.close() return True except Error as e: print(f"更新卡密状态错误: {e}") return False def close(self): """关闭数据库连接""" if self.connection and self.connection.is_connected(): self.connection.close()