From ca4e7b7b17fbb7df612697f6872ede470281f2ea Mon Sep 17 00:00:00 2001 From: taiyi Date: Fri, 8 Aug 2025 18:28:46 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6=E8=87=B3?= =?UTF-8?q?=20/?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.py | 102 ++++++++++++++++ database.py | 254 +++++++++++++++++++++++++++++++++++++++ db_config.json | 6 + encryptor.py | 309 ++++++++++++++++++++++++++++++++++++++++++++++++ machine_code.py | 275 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 946 insertions(+) create mode 100644 config.py create mode 100644 database.py create mode 100644 db_config.json create mode 100644 encryptor.py create mode 100644 machine_code.py diff --git a/config.py b/config.py new file mode 100644 index 0000000..5a57e34 --- /dev/null +++ b/config.py @@ -0,0 +1,102 @@ +""" +Configuration file for the EXE encryption system +Contains security settings and system parameters +""" + +import os +from pathlib import Path + +# System configuration +SYSTEM_CONFIG = { + 'app_name': 'EXE Secure Wrapper', + 'version': '2.0.0', + 'company': 'Secure Software Solutions', + 'contact': 'support@securesoft.com' +} + +# Security settings +SECURITY_CONFIG = { + # Encryption settings + 'key_length': 32, # bytes + 'salt_length': 32, # bytes + 'iterations': 100000, # PBKDF2 iterations + 'hash_algorithm': 'sha256', + 'xor_key': os.environ.get('EXE_WRAPPER_KEY', 'EXEWrapper#2024').encode(), + + # File validation + 'min_file_size': 1024, # bytes + 'magic_header': b'ENC_MAGIC', + 'header_size': 512, # 配置头固定长度(字节) + + # License settings + 'trial_days': 7, + 'license_key_length': 20, + 'license_format': 'XXXXX-XXXXX-XXXXX-XXXXX', +} + +# Temporary file settings +TEMP_CONFIG = { + 'temp_prefix': 'sec_wrap_', + 'max_temp_age': 3600, # seconds (1 hour) + 'auto_cleanup': True, +} + +# Database settings (会被嵌入到包装文件中) +DATABASE_CONFIG = { + 'mysql': { + 'host': os.environ.get('DB_HOST', 'localhost'), + 'port': int(os.environ.get('DB_PORT', 3306)), + 'database': os.environ.get('DB_NAME', 'license_system'), + 'user': os.environ.get('DB_USER', 'root'), + 'password': os.environ.get('DB_PASSWORD', ''), + 'charset': 'utf8mb4', + 'connection_timeout': 30, + 'ssl_disabled': True, + }, + 'sqlite': { + 'filename': 'licenses_local.db', + 'check_same_thread': False, + } +} + +# Logging configuration +LOGGING_CONFIG = { + 'level': 'INFO', + 'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s', + 'file_max_size': 10 * 1024 * 1024, # 10MB + 'backup_count': 5, + 'log_dir': 'logs', +} + +# Validation settings +VALIDATION_CONFIG = { + 'check_internet': True, + 'max_retries': 3, + 'retry_delay': 1, # seconds + 'timeout': 10, # seconds + 'heartbeat_interval': 300, # seconds (5 minutes) +} + +# Build paths +BASE_DIR = Path(__file__).parent.absolute() +CONFIG_DIR = BASE_DIR / "config" +LOG_DIR = BASE_DIR / LOGGING_CONFIG['log_dir'] +TEMP_DIR = BASE_DIR / "temp" + +# Ensure directories exist +for directory in [LOG_DIR, TEMP_DIR, CONFIG_DIR]: + directory.mkdir(exist_ok=True) + +def get_config_path(filename): + """Get full path to configuration file""" + return CONFIG_DIR / filename + +def get_temp_path(filename=None): + """Get temporary file path""" + if filename: + return TEMP_DIR / filename + return TEMP_DIR + +def get_log_path(filename): + """Get log file path""" + return LOG_DIR / filename \ No newline at end of file diff --git a/database.py b/database.py new file mode 100644 index 0000000..449bc2e --- /dev/null +++ b/database.py @@ -0,0 +1,254 @@ +import mysql.connector +from mysql.connector import Error +import uuid +from datetime import datetime, timedelta +import hashlib +import os + + +class LicenseDatabase: + def __init__(self, host=None, database=None, user=None, password=None): + self.host = host or os.environ.get('DB_HOST', 'taiyiagi.xyz') + self.database = database or os.environ.get('DB_NAME', 'filesend') + self.user = user or os.environ.get('DB_USER', 'taiyi') + self.password = password or os.environ.get('DB_PASSWORD', 'taiyi1224') + self.connection = None + + def connect(self): + """连接到数据库""" + try: + self.connection = mysql.connector.connect( + host=self.host, + database=self.database, + user=self.user, + password=self.password + ) + if self.connection.is_connected(): + return True + return False + except Error as e: + print(f"数据库连接错误: {e}") + return False + + def create_tables(self): + """创建必要的数据库表""" + if not self.connection or not self.connection.is_connected(): + if not self.connect(): + return False + + try: + cursor = self.connection.cursor() + + # 创建卡密表 + cursor.execute(''' + CREATE TABLE IF NOT EXISTS license_keys ( + id INT AUTO_INCREMENT PRIMARY KEY, + key_code VARCHAR(50) NOT NULL UNIQUE, + machine_code VARCHAR(100) DEFAULT NULL, + start_time DATETIME DEFAULT NULL, + end_time DATETIME NOT NULL, + status ENUM('unused', 'active', 'expired', 'banned') DEFAULT 'unused', + created_at DATETIME DEFAULT CURRENT_TIMESTAMP + ) + ''') + + self.connection.commit() + cursor.close() + return True + except Error as e: + print(f"创建表错误: {e}") + return False + + def generate_key(self, days_valid): + """生成一个新的卡密并保存到数据库""" + if not self.connection or not self.connection.is_connected(): + if not self.connect(): + return None + + try: + # 生成UUID作为基础,然后进行哈希处理 + key_uuid = uuid.uuid4().hex + hash_obj = hashlib.sha256(key_uuid.encode()) + # 取前20个字符作为卡密 + key_code = hash_obj.hexdigest()[:20].upper() + + # 格式化卡密,每5个字符一组 + formatted_key = '-'.join([key_code[i:i + 5] for i in range(0, len(key_code), 5)]) + + # 计算过期时间 + end_time = datetime.now() + timedelta(days=days_valid) + + cursor = self.connection.cursor() + query = """ + INSERT INTO license_keys (key_code, end_time) + VALUES (%s, %s) + """ + cursor.execute(query, (formatted_key, end_time)) + self.connection.commit() + cursor.close() + + return formatted_key + except Error as e: + print(f"生成卡密错误: {e}") + return None + + def validate_key(self, key_code, machine_code): + """验证卡密是否有效,并绑定机器码 - 严格一机一码""" + if not self.connection or not self.connection.is_connected(): + if not self.connect(): + return False, "数据库连接失败" + + try: + cursor = self.connection.cursor(dictionary=True) + + # 查询卡密信息 + query = "SELECT * FROM license_keys WHERE key_code = %s" + cursor.execute(query, (key_code,)) + key_info = cursor.fetchone() + + if not key_info: + cursor.close() + return False, "无效的激活码" + + # 检查卡密状态 + if key_info['status'] == 'banned': + cursor.close() + return False, "激活码已被封禁" + + if key_info['status'] == 'expired': + cursor.close() + return False, "激活码已过期" + + # 一机一码严格检查:每个激活码只能在一台机器上使用 + if key_info['status'] == 'active': + if key_info['machine_code'] != machine_code: + # 这个码已经用过了,不能再次使用 + cursor.close() + return False, f"此激活码已在设备{key_info['machine_code'][:8]}...上使用,一个激活码只能在一台设备上使用一次" + else: + # 已经激活过这台机器,验证是否过期 + if datetime.now() > key_info['end_time']: + update_query = "UPDATE license_keys SET status = 'expired' WHERE key_code = %s" + cursor.execute(update_query, (key_code,)) + self.connection.commit() + cursor.close() + return False, "激活码已过期" + else: + cursor.close() + return True, "此设备已激活,继续使用" + + # 首次激活:验证通过后绑定到机器 + if key_info['status'] == 'unused': + update_query = """ + UPDATE license_keys + SET status = 'active', machine_code = %s, start_time = %s + WHERE key_code = %s AND status = 'unused' + """ + cursor.execute(update_query, (machine_code, datetime.now(), key_code)) + rows_affected = cursor.rowcount + self.connection.commit() + + if rows_affected == 0: + # 可能已经被其他并发操作激活 + cursor.close() + return False, "激活码已被使用,请使用新的激活码" + + # 再次检查是否过期(防止并发问题) + final_check_query = "SELECT * FROM license_keys WHERE key_code = %s" + cursor.execute(final_check_query, (key_code,)) + final_info = cursor.fetchone() + + if final_info and datetime.now() > final_info['end_time']: + update_query = "UPDATE license_keys SET status = 'expired' WHERE key_code = %s" + cursor.execute(update_query, (key_code,)) + self.connection.commit() + cursor.close() + return False, "激活码已过期" + + cursor.close() + return True, "激活成功" + + except Error as e: + print(f"验证激活码错误: {e}") + return False, f"验证过程出错: {str(e)}" + + def get_all_keys(self): + """获取所有卡密信息""" + if not self.connection or not self.connection.is_connected(): + if not self.connect(): + return [] + + try: + cursor = self.connection.cursor(dictionary=True) + query = "SELECT * FROM license_keys ORDER BY created_at DESC" + cursor.execute(query) + keys = cursor.fetchall() + cursor.close() + return keys + except Error as e: + print(f"获取卡密列表错误: {e}") + return [] + + def update_key_status(self, key_code, status): + """更新卡密状态""" + if not self.connection or not self.connection.is_connected(): + if not self.connect(): + return False + + try: + cursor = self.connection.cursor() + query = "UPDATE license_keys SET status = %s WHERE key_code = %s" + cursor.execute(query, (status, key_code)) + self.connection.commit() + cursor.close() + return True + except Error as e: + print(f"更新卡密状态错误: {e}") + return False + + def release_key(self, key_code): + """释放已使用的激活码 - 将其重置为未使用状态,清空机器码""" + if not self.connection or not self.connection.is_connected(): + if not self.connect(): + return False, "数据库连接失败" + + try: + cursor = self.connection.cursor(dictionary=True) + + # 检查卡密是否存在且处于已激活状态 + check_query = "SELECT * FROM license_keys WHERE key_code = %s" + cursor.execute(check_query, (key_code,)) + key_info = cursor.fetchone() + + if not key_info: + cursor.close() + return False, "激活码不存在" + + if key_info['status'] != 'active': + cursor.close() + return False, f"激活码处于 {key_info['status']} 状态,只能释放已使用的激活码" + + # 释放激活码:重置为未使用状态,清空机器码和开始时间 + release_query = """ + UPDATE license_keys + SET status = 'unused', machine_code = NULL, start_time = NULL + WHERE key_code = %s + """ + cursor.execute(release_query, (key_code,)) + rows_affected = cursor.rowcount + self.connection.commit() + cursor.close() + + if rows_affected > 0: + return True, "激活码已释放,可以重新使用" + else: + return False, "释放激活码失败" + + except Error as e: + print(f"释放激活码错误: {e}") + return False, f"释放过程出错: {str(e)}" + + def close(self): + """关闭数据库连接""" + if self.connection and self.connection.is_connected(): + self.connection.close() diff --git a/db_config.json b/db_config.json new file mode 100644 index 0000000..89977e5 --- /dev/null +++ b/db_config.json @@ -0,0 +1,6 @@ +{ + "host": "taiyiagi.xyz", + "database": "filesend", + "user": "taiyi", + "password": "taiyi1224" +} \ No newline at end of file diff --git a/encryptor.py b/encryptor.py new file mode 100644 index 0000000..c57c99e --- /dev/null +++ b/encryptor.py @@ -0,0 +1,309 @@ +import os +import shutil +import subprocess +import tempfile +import json +import struct +import hashlib +import zlib +from typing import Tuple, Optional +import time + +class EXEEncryptor: + """EXE文件加密器类 - 提供EXE文件加密、编译和验证功能""" + + def __init__(self): + """初始化加密器""" + pass + + # encryptor.py + + def encrypt_file(self, source_path: str, output_path: str, validator_path: str, db_config: dict) -> Tuple[ + bool, str]: + """重新实现:将原始exe作为资源嵌入验证器exe中""" + try: + # 读取原始exe内容(作为资源) + with open(source_path, 'rb') as f: + original_content = f.read() + + # 计算原始exe的哈希(用于验证完整性) + file_hash = hashlib.sha256(original_content).hexdigest() + + # 创建资源数据(原始exe + 配置信息) + resource_data = { + 'original_exe': original_content.hex(), # 十六进制字符串 + 'original_size': len(original_content), + 'file_hash': file_hash, + 'db_config': db_config, + 'encryption_time': int(time.time()) + } + + # 将资源数据写入Python文件(作为字符串常量) + resource_code = json.dumps(resource_data, separators=(',', ':')) + + # 读取验证器模板(validator_wrapper.py) + # with open(validator_path, 'r', encoding='utf-8') as f: + # 读取验证器模板 + with open(validator_path, 'r', encoding='utf-8', errors='ignore') as f: + validator_template = f.read() + + # 构造资源数据(确保是合法 JSON) + resource_data = { + 'original_exe': original_content.hex(), + 'original_size': len(original_content), + 'file_hash': file_hash, + 'db_config': db_config, + } + + + + # 插入资源数据到验证器模板中 + final_validator_code = validator_template.replace( + '# RESOURCE_DATA_PLACEHOLDER\nRESOURCE_DATA = None', + f'RESOURCE_DATA = {json.dumps(resource_data)}' + ) + + + print("插入后的代码片段:") + print(final_validator_code[:500]) # 打印前500字符 + + # 写入临时文件 + temp_validator_py = os.path.join(tempfile.gettempdir(), 'temp_validator.py') + with open(temp_validator_py, 'w', encoding='utf-8') as f: + f.write(final_validator_code) + + # 调试:检查是否插入成功 + with open(temp_validator_py, 'r', encoding='utf-8') as f: + content = f.read() + if 'RESOURCE_DATA = {"original_exe"' in content: + print("✅ 资源数据已成功插入") + else: + print("❌ 资源数据未插入,请检查替换逻辑") + + # 使用PyInstaller将验证器编译为exe + success, msg = self.compile_to_exe(temp_validator_py, output_path) + if not success: + return False, msg + + # 清理临时文件 + if os.path.exists(temp_validator_py): + os.remove(temp_validator_py) + + return True, f"壳程序编译成功:{output_path}" + + except Exception as e: + return False, f"套壳失败:{str(e)}" + + def _create_encryption_header(self, original_size: int, db_config: dict, file_hash: str) -> bytes: + """创建加密文件头信息""" + header_data = { + 'version': '2.0', + 'original_size': original_size, + 'encryption_time': int(time.time()), + 'compression_level': 6, + 'file_hash': file_hash, + 'db_config': db_config + } + return json.dumps(header_data).encode('utf-8').ljust(256, b'\x00') + + def _simple_encrypt(self, data: bytes) -> bytes: + """简化版XOR加密""" + key = b'EXEProtector#2024' + encrypted = bytearray(data) + key_len = len(key) + + for i in range(len(encrypted)): + encrypted[i] ^= key[i % key_len] + + return bytes(encrypted) + + def compile_to_exe(self, python_file: str, output_path: str = None) -> Tuple[bool, str]: + """尝试将Python文件编译为EXE""" + try: + # 确保Python文件存在 + py_file = python_file.replace('.exe', '.py') + if not os.path.exists(py_file): + return False, f"Python源文件不存在: {py_file}" + + print(f"Debug: 开始编译 {py_file} 到 {python_file}") + + # 检查PyInstaller + try: + result = subprocess.run(['pyinstaller', '--version'], + capture_output=True, text=True, timeout=10) + if result.returncode != 0: + return self._fallback_to_python_file(py_file, python_file) + except (subprocess.TimeoutExpired, FileNotFoundError): + return self._fallback_to_python_file(py_file, python_file) + + # 获取文件信息 + file_dir = os.path.dirname(os.path.abspath(py_file)) + exe_name = os.path.basename(python_file).replace('.exe', '') + + # 切换到文件目录 + original_cwd = os.getcwd() + os.chdir(file_dir) + + try: + # 创建规范文件内容 + spec_content = self._create_spec_content(py_file, exe_name) + spec_file = f'{exe_name}.spec' + + with open(spec_file, 'w', encoding='utf-8') as f: + f.write(spec_content) + + print(f"Debug: 创建规范文件: {spec_file}") + + # 使用规范文件编译 + cmd = ['pyinstaller', '--clean', '--noconfirm', spec_file] + + print(f"Debug: 执行编译命令: {' '.join(cmd)}") + + # 执行编译 + result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) + + if result.returncode == 0: + # 检查输出文件 + output_exe = os.path.join('dist', f'{exe_name}.exe') + if os.path.exists(output_exe): + # 移动到目标位置 + target_path = os.path.basename(python_file) + final_output = output_path or os.path.basename(python_file).replace('.py', '.exe') + os.makedirs(os.path.dirname(final_output), exist_ok=True) + if os.path.exists(final_output): + os.remove(final_output) + shutil.move(output_exe, final_output) + print(f"Debug: 编译成功,输出文件: {final_output}") + return True, "EXE编译成功" + else: + print(f"Debug: 找不到输出文件: {output_exe}") + return False, "编译完成但找不到输出文件" + else: + # 编译失败,保留Python文件 + error_info = result.stderr[:500] if result.stderr else result.stdout[:500] + print(f"Debug: 编译失败: {error_info}") + return self._fallback_to_python_file(py_file, python_file, f"编译失败: {error_info}") + + finally: + os.chdir(original_cwd) + + except subprocess.TimeoutExpired: + print("Debug: 编译超时") + return self._fallback_to_python_file(py_file, python_file, "编译超时") + except Exception as e: + print(f"Debug: 编译过程异常: {e}") + return False, f"编译过程异常: {str(e)}" + + + + def _create_spec_content(self, py_file: str, exe_name: str) -> str: + """创建PyInstaller规范文件内容""" + return f'''# -*- mode: python ; coding: utf-8 -*- + +block_cipher = None + +a = Analysis( + ['{os.path.basename(py_file)}'], + pathex=[], + binaries=[], + datas=[], + hiddenimports=[ + 'mysql.connector', + 'cryptography.fernet', + 'tkinter', + 'tkinter.messagebox', + 'tkinter.scrolledtext', + 'hashlib', + 'subprocess', + 'tempfile', + 'threading', + 'atexit', + 'psutil', + 'ctypes', + 'win32api', + 'win32con', + 'win32security', + 'win32process', + ], + hookspath=[], + hooksconfig={{}}, + runtime_hooks=[], + excludes=[], + win_no_prefer_redirects=False, + win_private_assemblies=False, + cipher=block_cipher, + noarchive=False, +) + +pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher) + +exe = EXE( + pyz, + a.scripts, + a.binaries, + a.zipfiles, + a.datas, + [], + name='{exe_name}', + debug=False, + bootloader_ignore_signals=False, + strip=False, + upx=False, + upx_exclude=[], + runtime_tmpdir=None, + console=False, + disable_windowed_traceback=False, + argv_emulation=False, + target_arch=None, + codesign_identity=None, + entitlements_file=None, + uac_admin=False, + uac_uiaccess=False, +) +''' + + def _fallback_to_python_file(self, py_file: str, target_file: str, + reason: str = "PyInstaller不可用") -> Tuple[bool, str]: + """回退到Python文件""" + try: + # 将.exe改为.py + python_target = target_file.replace('.exe', '.py') + + if py_file != python_target: + if os.path.exists(python_target): + os.remove(python_target) + shutil.copy2(py_file, python_target) + + message = f"{reason},使用Python文件: {python_target}" + print(f"Debug: {message}") + return True, message + + except Exception as e: + error_msg = f"回退失败: {str(e)}" + print(f"Debug: {error_msg}") + return False, error_msg + + def _cleanup_build_files(self, exe_name: str): + """清理编译产生的临时文件""" + try: + # 删除build目录 + if os.path.exists('build'): + shutil.rmtree('build', ignore_errors=True) + + # 删除dist目录 + if os.path.exists('dist'): + shutil.rmtree('dist', ignore_errors=True) + + # 删除spec文件 + spec_file = f'{exe_name}.spec' + if os.path.exists(spec_file): + os.remove(spec_file) + + print("Debug: 清理临时文件完成") + + except Exception as e: + print(f"Debug: 清理临时文件时出错: {e}") + + + + diff --git a/machine_code.py b/machine_code.py new file mode 100644 index 0000000..740df74 --- /dev/null +++ b/machine_code.py @@ -0,0 +1,275 @@ +import hashlib +import platform +import subprocess +import sys +import uuid + + +def get_windows_machine_code(): + """获取Windows系统的机器码""" + try: + # 尝试获取主板序列号 + try: + wmic_output = subprocess.check_output( + 'wmic baseboard get serialnumber', + shell=True, + stderr=subprocess.STDOUT, + timeout=10 + ).decode().strip() + + if "SerialNumber" in wmic_output: + lines = [line.strip() for line in wmic_output.split("\n") if line.strip()] + if len(lines) > 1: + serial = lines[1] + if serial and serial not in ["To Be Filled By O.E.M.", "Default string", "N/A", ""]: + return hashlib.md5(serial.encode()).hexdigest()[:16].upper() + except: + pass + + # 尝试获取硬盘序列号 + try: + wmic_output = subprocess.check_output( + 'wmic diskdrive get serialnumber', + shell=True, + stderr=subprocess.STDOUT, + timeout=10 + ).decode().strip() + + if "SerialNumber" in wmic_output: + lines = [line.strip() for line in wmic_output.split("\n") if line.strip()] + for line in lines[1:]: # 跳过标题行 + if line and line not in ["", "N/A"]: + return hashlib.md5(line.encode()).hexdigest()[:16].upper() + except: + pass + + # 尝试获取CPU序列号 + try: + wmic_output = subprocess.check_output( + 'wmic cpu get processorid', + shell=True, + stderr=subprocess.STDOUT, + timeout=10 + ).decode().strip() + + if "ProcessorId" in wmic_output: + lines = [line.strip() for line in wmic_output.split("\n") if line.strip()] + if len(lines) > 1: + cpu_id = lines[1] + if cpu_id and cpu_id != "": + return hashlib.md5(cpu_id.encode()).hexdigest()[:16].upper() + except: + pass + + # 尝试获取BIOS序列号 + try: + wmic_output = subprocess.check_output( + 'wmic bios get serialnumber', + shell=True, + stderr=subprocess.STDOUT, + timeout=10 + ).decode().strip() + + if "SerialNumber" in wmic_output: + lines = [line.strip() for line in wmic_output.split("\n") if line.strip()] + if len(lines) > 1: + bios_serial = lines[1] + if bios_serial and bios_serial not in ["To Be Filled By O.E.M.", "Default string", "N/A", ""]: + return hashlib.md5(bios_serial.encode()).hexdigest()[:16].upper() + except: + pass + + # 组合多个系统信息生成唯一标识 + try: + # 获取网卡MAC地址 + mac = uuid.getnode() + mac_str = ':'.join(['{:02x}'.format((mac >> elements) & 0xff) for elements in range(0, 2 * 6, 2)][::-1]) + + # 获取计算机名 + computer_name = platform.node() + + # 获取系统信息 + system_info = f"{platform.system()}-{platform.release()}-{platform.version()}" + + # 组合信息 + combined_info = f"{mac_str}-{computer_name}-{system_info}" + return hashlib.md5(combined_info.encode()).hexdigest()[:16].upper() + + except Exception as e: + # 最后的备用方案 + fallback = f"{platform.uname()}-{uuid.getnode()}" + return hashlib.md5(fallback.encode()).hexdigest()[:16].upper() + + except Exception as e: + print(f"获取Windows机器码错误: {e}") + # 生成一个基于系统信息的备用哈希 + try: + fallback = f"{platform.node()}-{uuid.getnode()}-{platform.processor()}" + return hashlib.md5(fallback.encode()).hexdigest()[:16].upper() + except: + # 终极备用方案 + import time + fallback = f"FALLBACK-{int(time.time())}-{platform.system()}" + return hashlib.md5(fallback.encode()).hexdigest()[:16].upper() + + +def get_linux_machine_code(): + """获取Linux系统的机器码""" + try: + # 尝试读取machine-id + try: + with open('/etc/machine-id', 'r') as f: + machine_id = f.read().strip() + if machine_id and len(machine_id) > 10: + return machine_id[:16].upper() + except: + pass + + # 尝试读取/var/lib/dbus/machine-id + try: + with open('/var/lib/dbus/machine-id', 'r') as f: + machine_id = f.read().strip() + if machine_id and len(machine_id) > 10: + return machine_id[:16].upper() + except: + pass + + # 尝试获取主板信息 + try: + with open('/sys/class/dmi/id/board_serial', 'r') as f: + board_serial = f.read().strip() + if board_serial and board_serial not in ["", "N/A", "To be filled by O.E.M."]: + return hashlib.md5(board_serial.encode()).hexdigest()[:16].upper() + except: + pass + + # 尝试获取产品UUID + try: + with open('/sys/class/dmi/id/product_uuid', 'r') as f: + product_uuid = f.read().strip() + if product_uuid and product_uuid != "": + return hashlib.md5(product_uuid.encode()).hexdigest()[:16].upper() + except: + pass + + # 获取MAC地址和主机名组合 + mac = uuid.getnode() + mac_str = ':'.join(['{:02x}'.format((mac >> elements) & 0xff) for elements in range(0, 2 * 6, 2)][::-1]) + hostname = platform.node() + combined = f"{mac_str}-{hostname}" + return hashlib.md5(combined.encode()).hexdigest()[:16].upper() + + except Exception as e: + print(f"获取Linux机器码错误: {e}") + # 备用方案 + system_info = f"{platform.node()}-{uuid.getnode()}-{platform.machine()}" + return hashlib.md5(system_info.encode()).hexdigest()[:16].upper() + + +def get_mac_machine_code(): + """获取macOS系统的机器码""" + try: + # 尝试获取硬件UUID + try: + result = subprocess.check_output( + ['system_profiler', 'SPHardwareDataType'], + stderr=subprocess.STDOUT, + timeout=10 + ).decode().strip() + + for line in result.split('\n'): + if 'Hardware UUID' in line: + uuid_part = line.split(':')[1].strip() + return hashlib.md5(uuid_part.encode()).hexdigest()[:16].upper() + except: + pass + + # 尝试获取序列号 + try: + serial = subprocess.check_output( + ['system_profiler', 'SPHardwareDataType', '|', 'grep', 'Serial'], + shell=True, + stderr=subprocess.STDOUT, + timeout=10 + ).decode().strip() + + if serial: + serial_number = serial.split(':')[1].strip() + return hashlib.md5(serial_number.encode()).hexdigest()[:16].upper() + except: + pass + + # 备用方案 + mac = uuid.getnode() + hostname = platform.node() + combined = f"{mac}-{hostname}-{platform.machine()}" + return hashlib.md5(combined.encode()).hexdigest()[:16].upper() + + except Exception as e: + print(f"获取macOS机器码错误: {e}") + # 备用方案 + system_info = f"{platform.node()}-{uuid.getnode()}-{platform.machine()}" + return hashlib.md5(system_info.encode()).hexdigest()[:16].upper() + + +def get_machine_code(): + """获取当前系统的机器码""" + try: + system = platform.system() + + if system == "Windows": + return get_windows_machine_code() + elif system == "Linux": + return get_linux_machine_code() + elif system == "Darwin": # macOS + return get_mac_machine_code() + else: + # 未知系统,使用通用方法 + system_info = f"{platform.node()}-{uuid.getnode()}-{platform.processor()}-{platform.machine()}" + return hashlib.md5(system_info.encode()).hexdigest()[:16].upper() + + except Exception as e: + print(f"获取机器码失败: {e}") + # 终极备用方案 + try: + fallback = f"{platform.system()}-{uuid.getnode()}-{platform.node()}" + return hashlib.md5(fallback.encode()).hexdigest()[:16].upper() + except: + import time + ultimate_fallback = f"MACHINE-{int(time.time())}" + return hashlib.md5(ultimate_fallback.encode()).hexdigest()[:16].upper() + + +def format_machine_code(machine_code): + """格式化机器码为更易读的格式""" + if len(machine_code) >= 16: + # 将16位机器码格式化为 XXXX-XXXX-XXXX-XXXX + return f"{machine_code[:4]}-{machine_code[4:8]}-{machine_code[8:12]}-{machine_code[12:16]}" + else: + return machine_code + + +def verify_machine_code(stored_code, current_code): + """验证机器码是否匹配""" + # 移除格式化字符进行比较 + stored_clean = stored_code.replace('-', '').upper() + current_clean = current_code.replace('-', '').upper() + return stored_clean == current_clean + + +if __name__ == "__main__": + print("机器码生成器测试") + print("-" * 30) + + machine_code = get_machine_code() + formatted_code = format_machine_code(machine_code) + + print(f"系统: {platform.system()} {platform.release()}") + print(f"原始机器码: {machine_code}") + print(f"格式化机器码: {formatted_code}") + print(f"机器码长度: {len(machine_code)}") + + # 测试验证功能 + print(f"\n验证测试: {verify_machine_code(formatted_code, machine_code)}") + + input("\n按回车键退出...") \ No newline at end of file