上传文件至 /
This commit is contained in:
		
							parent
							
								
									a1b39727bd
								
							
						
					
					
						commit
						ca4e7b7b17
					
				
							
								
								
									
										102
									
								
								config.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										102
									
								
								config.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,102 @@ | ||||
| """ | ||||
| Configuration file for the EXE encryption system | ||||
| Contains security settings and system parameters | ||||
| """ | ||||
| 
 | ||||
| import os | ||||
| from pathlib import Path | ||||
| 
 | ||||
| # System configuration | ||||
| SYSTEM_CONFIG = { | ||||
|     'app_name': 'EXE Secure Wrapper', | ||||
|     'version': '2.0.0', | ||||
|     'company': 'Secure Software Solutions', | ||||
|     'contact': 'support@securesoft.com' | ||||
| } | ||||
| 
 | ||||
| # Security settings | ||||
| SECURITY_CONFIG = { | ||||
|     # Encryption settings | ||||
|     'key_length': 32,  # bytes | ||||
|     'salt_length': 32,  # bytes | ||||
|     'iterations': 100000,  # PBKDF2 iterations | ||||
|     'hash_algorithm': 'sha256', | ||||
|     'xor_key': os.environ.get('EXE_WRAPPER_KEY', 'EXEWrapper#2024').encode(), | ||||
| 
 | ||||
|     # File validation | ||||
|     'min_file_size': 1024,  # bytes | ||||
|     'magic_header': b'ENC_MAGIC', | ||||
|     'header_size': 512,  # 配置头固定长度(字节) | ||||
| 
 | ||||
|     # License settings | ||||
|     'trial_days': 7, | ||||
|     'license_key_length': 20, | ||||
|     'license_format': 'XXXXX-XXXXX-XXXXX-XXXXX', | ||||
| } | ||||
| 
 | ||||
| # Temporary file settings | ||||
| TEMP_CONFIG = { | ||||
|     'temp_prefix': 'sec_wrap_', | ||||
|     'max_temp_age': 3600,  # seconds (1 hour) | ||||
|     'auto_cleanup': True, | ||||
| } | ||||
| 
 | ||||
| # Database settings (会被嵌入到包装文件中) | ||||
| DATABASE_CONFIG = { | ||||
|     'mysql': { | ||||
|         'host': os.environ.get('DB_HOST', 'localhost'), | ||||
|         'port': int(os.environ.get('DB_PORT', 3306)), | ||||
|         'database': os.environ.get('DB_NAME', 'license_system'), | ||||
|         'user': os.environ.get('DB_USER', 'root'), | ||||
|         'password': os.environ.get('DB_PASSWORD', ''), | ||||
|         'charset': 'utf8mb4', | ||||
|         'connection_timeout': 30, | ||||
|         'ssl_disabled': True, | ||||
|     }, | ||||
|     'sqlite': { | ||||
|         'filename': 'licenses_local.db', | ||||
|         'check_same_thread': False, | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| # Logging configuration | ||||
| LOGGING_CONFIG = { | ||||
|     'level': 'INFO', | ||||
|     'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s', | ||||
|     'file_max_size': 10 * 1024 * 1024,  # 10MB | ||||
|     'backup_count': 5, | ||||
|     'log_dir': 'logs', | ||||
| } | ||||
| 
 | ||||
| # Validation settings | ||||
| VALIDATION_CONFIG = { | ||||
|     'check_internet': True, | ||||
|     'max_retries': 3, | ||||
|     'retry_delay': 1,  # seconds | ||||
|     'timeout': 10,  # seconds | ||||
|     'heartbeat_interval': 300,  # seconds (5 minutes) | ||||
| } | ||||
| 
 | ||||
| # Build paths | ||||
| BASE_DIR = Path(__file__).parent.absolute() | ||||
| CONFIG_DIR = BASE_DIR / "config" | ||||
| LOG_DIR = BASE_DIR / LOGGING_CONFIG['log_dir'] | ||||
| TEMP_DIR = BASE_DIR / "temp" | ||||
| 
 | ||||
| # Ensure directories exist | ||||
| for directory in [LOG_DIR, TEMP_DIR, CONFIG_DIR]: | ||||
|     directory.mkdir(exist_ok=True) | ||||
| 
 | ||||
| def get_config_path(filename): | ||||
|     """Get full path to configuration file""" | ||||
|     return CONFIG_DIR / filename | ||||
| 
 | ||||
| def get_temp_path(filename=None): | ||||
|     """Get temporary file path""" | ||||
|     if filename: | ||||
|         return TEMP_DIR / filename | ||||
|     return TEMP_DIR | ||||
| 
 | ||||
| def get_log_path(filename): | ||||
|     """Get log file path""" | ||||
|     return LOG_DIR / filename | ||||
							
								
								
									
										254
									
								
								database.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										254
									
								
								database.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,254 @@ | ||||
| import mysql.connector | ||||
| from mysql.connector import Error | ||||
| import uuid | ||||
| from datetime import datetime, timedelta | ||||
| import hashlib | ||||
| import os | ||||
| 
 | ||||
| 
 | ||||
| class LicenseDatabase: | ||||
|     def __init__(self, host=None, database=None, user=None, password=None): | ||||
|         self.host = host or os.environ.get('DB_HOST', 'taiyiagi.xyz') | ||||
|         self.database = database or os.environ.get('DB_NAME', 'filesend') | ||||
|         self.user = user or os.environ.get('DB_USER', 'taiyi') | ||||
|         self.password = password or os.environ.get('DB_PASSWORD', 'taiyi1224') | ||||
|         self.connection = None | ||||
| 
 | ||||
|     def connect(self): | ||||
|         """连接到数据库""" | ||||
|         try: | ||||
|             self.connection = mysql.connector.connect( | ||||
|                 host=self.host, | ||||
|                 database=self.database, | ||||
|                 user=self.user, | ||||
|                 password=self.password | ||||
|             ) | ||||
|             if self.connection.is_connected(): | ||||
|                 return True | ||||
|             return False | ||||
|         except Error as e: | ||||
|             print(f"数据库连接错误: {e}") | ||||
|             return False | ||||
| 
 | ||||
|     def create_tables(self): | ||||
|         """创建必要的数据库表""" | ||||
|         if not self.connection or not self.connection.is_connected(): | ||||
|             if not self.connect(): | ||||
|                 return False | ||||
| 
 | ||||
|         try: | ||||
|             cursor = self.connection.cursor() | ||||
| 
 | ||||
|             # 创建卡密表 | ||||
|             cursor.execute(''' | ||||
|             CREATE TABLE IF NOT EXISTS license_keys ( | ||||
|                 id INT AUTO_INCREMENT PRIMARY KEY, | ||||
|                 key_code VARCHAR(50) NOT NULL UNIQUE, | ||||
|                 machine_code VARCHAR(100) DEFAULT NULL, | ||||
|                 start_time DATETIME DEFAULT NULL, | ||||
|                 end_time DATETIME NOT NULL, | ||||
|                 status ENUM('unused', 'active', 'expired', 'banned') DEFAULT 'unused', | ||||
|                 created_at DATETIME DEFAULT CURRENT_TIMESTAMP | ||||
|             ) | ||||
|             ''') | ||||
| 
 | ||||
|             self.connection.commit() | ||||
|             cursor.close() | ||||
|             return True | ||||
|         except Error as e: | ||||
|             print(f"创建表错误: {e}") | ||||
|             return False | ||||
| 
 | ||||
|     def generate_key(self, days_valid): | ||||
|         """生成一个新的卡密并保存到数据库""" | ||||
|         if not self.connection or not self.connection.is_connected(): | ||||
|             if not self.connect(): | ||||
|                 return None | ||||
| 
 | ||||
|         try: | ||||
|             # 生成UUID作为基础,然后进行哈希处理 | ||||
|             key_uuid = uuid.uuid4().hex | ||||
|             hash_obj = hashlib.sha256(key_uuid.encode()) | ||||
|             # 取前20个字符作为卡密 | ||||
|             key_code = hash_obj.hexdigest()[:20].upper() | ||||
| 
 | ||||
|             # 格式化卡密,每5个字符一组 | ||||
|             formatted_key = '-'.join([key_code[i:i + 5] for i in range(0, len(key_code), 5)]) | ||||
| 
 | ||||
|             # 计算过期时间 | ||||
|             end_time = datetime.now() + timedelta(days=days_valid) | ||||
| 
 | ||||
|             cursor = self.connection.cursor() | ||||
|             query = """ | ||||
|             INSERT INTO license_keys (key_code, end_time) | ||||
|             VALUES (%s, %s) | ||||
|             """ | ||||
|             cursor.execute(query, (formatted_key, end_time)) | ||||
|             self.connection.commit() | ||||
|             cursor.close() | ||||
| 
 | ||||
|             return formatted_key | ||||
|         except Error as e: | ||||
|             print(f"生成卡密错误: {e}") | ||||
|             return None | ||||
| 
 | ||||
|     def validate_key(self, key_code, machine_code): | ||||
|         """验证卡密是否有效,并绑定机器码 - 严格一机一码""" | ||||
|         if not self.connection or not self.connection.is_connected(): | ||||
|             if not self.connect(): | ||||
|                 return False, "数据库连接失败" | ||||
| 
 | ||||
|         try: | ||||
|             cursor = self.connection.cursor(dictionary=True) | ||||
| 
 | ||||
|             # 查询卡密信息 | ||||
|             query = "SELECT * FROM license_keys WHERE key_code = %s" | ||||
|             cursor.execute(query, (key_code,)) | ||||
|             key_info = cursor.fetchone() | ||||
| 
 | ||||
|             if not key_info: | ||||
|                 cursor.close() | ||||
|                 return False, "无效的激活码" | ||||
| 
 | ||||
|             # 检查卡密状态 | ||||
|             if key_info['status'] == 'banned': | ||||
|                 cursor.close() | ||||
|                 return False, "激活码已被封禁" | ||||
| 
 | ||||
|             if key_info['status'] == 'expired': | ||||
|                 cursor.close() | ||||
|                 return False, "激活码已过期" | ||||
| 
 | ||||
|             # 一机一码严格检查:每个激活码只能在一台机器上使用 | ||||
|             if key_info['status'] == 'active': | ||||
|                 if key_info['machine_code'] != machine_code: | ||||
|                     # 这个码已经用过了,不能再次使用 | ||||
|                     cursor.close() | ||||
|                     return False, f"此激活码已在设备{key_info['machine_code'][:8]}...上使用,一个激活码只能在一台设备上使用一次" | ||||
|                 else: | ||||
|                     # 已经激活过这台机器,验证是否过期 | ||||
|                     if datetime.now() > key_info['end_time']: | ||||
|                         update_query = "UPDATE license_keys SET status = 'expired' WHERE key_code = %s" | ||||
|                         cursor.execute(update_query, (key_code,)) | ||||
|                         self.connection.commit() | ||||
|                         cursor.close() | ||||
|                         return False, "激活码已过期" | ||||
|                     else: | ||||
|                         cursor.close() | ||||
|                         return True, "此设备已激活,继续使用" | ||||
| 
 | ||||
|             # 首次激活:验证通过后绑定到机器 | ||||
|             if key_info['status'] == 'unused': | ||||
|                 update_query = """ | ||||
|                 UPDATE license_keys  | ||||
|                 SET status = 'active', machine_code = %s, start_time = %s  | ||||
|                 WHERE key_code = %s AND status = 'unused' | ||||
|                 """ | ||||
|                 cursor.execute(update_query, (machine_code, datetime.now(), key_code)) | ||||
|                 rows_affected = cursor.rowcount | ||||
|                 self.connection.commit() | ||||
| 
 | ||||
|                 if rows_affected == 0: | ||||
|                     # 可能已经被其他并发操作激活 | ||||
|                     cursor.close() | ||||
|                     return False, "激活码已被使用,请使用新的激活码" | ||||
| 
 | ||||
|             # 再次检查是否过期(防止并发问题) | ||||
|             final_check_query = "SELECT * FROM license_keys WHERE key_code = %s" | ||||
|             cursor.execute(final_check_query, (key_code,)) | ||||
|             final_info = cursor.fetchone() | ||||
| 
 | ||||
|             if final_info and datetime.now() > final_info['end_time']: | ||||
|                 update_query = "UPDATE license_keys SET status = 'expired' WHERE key_code = %s" | ||||
|                 cursor.execute(update_query, (key_code,)) | ||||
|                 self.connection.commit() | ||||
|                 cursor.close() | ||||
|                 return False, "激活码已过期" | ||||
| 
 | ||||
|             cursor.close() | ||||
|             return True, "激活成功" | ||||
| 
 | ||||
|         except Error as e: | ||||
|             print(f"验证激活码错误: {e}") | ||||
|             return False, f"验证过程出错: {str(e)}" | ||||
| 
 | ||||
|     def get_all_keys(self): | ||||
|         """获取所有卡密信息""" | ||||
|         if not self.connection or not self.connection.is_connected(): | ||||
|             if not self.connect(): | ||||
|                 return [] | ||||
| 
 | ||||
|         try: | ||||
|             cursor = self.connection.cursor(dictionary=True) | ||||
|             query = "SELECT * FROM license_keys ORDER BY created_at DESC" | ||||
|             cursor.execute(query) | ||||
|             keys = cursor.fetchall() | ||||
|             cursor.close() | ||||
|             return keys | ||||
|         except Error as e: | ||||
|             print(f"获取卡密列表错误: {e}") | ||||
|             return [] | ||||
| 
 | ||||
|     def update_key_status(self, key_code, status): | ||||
|         """更新卡密状态""" | ||||
|         if not self.connection or not self.connection.is_connected(): | ||||
|             if not self.connect(): | ||||
|                 return False | ||||
| 
 | ||||
|         try: | ||||
|             cursor = self.connection.cursor() | ||||
|             query = "UPDATE license_keys SET status = %s WHERE key_code = %s" | ||||
|             cursor.execute(query, (status, key_code)) | ||||
|             self.connection.commit() | ||||
|             cursor.close() | ||||
|             return True | ||||
|         except Error as e: | ||||
|             print(f"更新卡密状态错误: {e}") | ||||
|             return False | ||||
| 
 | ||||
|     def release_key(self, key_code): | ||||
|         """释放已使用的激活码 - 将其重置为未使用状态,清空机器码""" | ||||
|         if not self.connection or not self.connection.is_connected(): | ||||
|             if not self.connect(): | ||||
|                 return False, "数据库连接失败" | ||||
| 
 | ||||
|         try: | ||||
|             cursor = self.connection.cursor(dictionary=True) | ||||
| 
 | ||||
|             # 检查卡密是否存在且处于已激活状态 | ||||
|             check_query = "SELECT * FROM license_keys WHERE key_code = %s" | ||||
|             cursor.execute(check_query, (key_code,)) | ||||
|             key_info = cursor.fetchone() | ||||
| 
 | ||||
|             if not key_info: | ||||
|                 cursor.close() | ||||
|                 return False, "激活码不存在" | ||||
| 
 | ||||
|             if key_info['status'] != 'active': | ||||
|                 cursor.close() | ||||
|                 return False, f"激活码处于 {key_info['status']} 状态,只能释放已使用的激活码" | ||||
| 
 | ||||
|             # 释放激活码:重置为未使用状态,清空机器码和开始时间 | ||||
|             release_query = """ | ||||
|             UPDATE license_keys  | ||||
|             SET status = 'unused', machine_code = NULL, start_time = NULL  | ||||
|             WHERE key_code = %s | ||||
|             """ | ||||
|             cursor.execute(release_query, (key_code,)) | ||||
|             rows_affected = cursor.rowcount | ||||
|             self.connection.commit() | ||||
|             cursor.close() | ||||
| 
 | ||||
|             if rows_affected > 0: | ||||
|                 return True, "激活码已释放,可以重新使用" | ||||
|             else: | ||||
|                 return False, "释放激活码失败" | ||||
| 
 | ||||
|         except Error as e: | ||||
|             print(f"释放激活码错误: {e}") | ||||
|             return False, f"释放过程出错: {str(e)}" | ||||
| 
 | ||||
|     def close(self): | ||||
|         """关闭数据库连接""" | ||||
|         if self.connection and self.connection.is_connected(): | ||||
|             self.connection.close() | ||||
							
								
								
									
										6
									
								
								db_config.json
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										6
									
								
								db_config.json
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,6 @@ | ||||
| { | ||||
|   "host": "taiyiagi.xyz", | ||||
|   "database": "filesend", | ||||
|   "user": "taiyi", | ||||
|   "password": "taiyi1224" | ||||
| } | ||||
							
								
								
									
										309
									
								
								encryptor.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										309
									
								
								encryptor.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,309 @@ | ||||
| import os | ||||
| import shutil | ||||
| import subprocess | ||||
| import tempfile | ||||
| import json | ||||
| import struct | ||||
| import hashlib | ||||
| import zlib | ||||
| from typing import Tuple, Optional | ||||
| import time | ||||
| 
 | ||||
| class EXEEncryptor: | ||||
|     """EXE文件加密器类 - 提供EXE文件加密、编译和验证功能""" | ||||
|      | ||||
|     def __init__(self): | ||||
|         """初始化加密器""" | ||||
|         pass | ||||
| 
 | ||||
|     # encryptor.py | ||||
| 
 | ||||
|     def encrypt_file(self, source_path: str, output_path: str, validator_path: str, db_config: dict) -> Tuple[ | ||||
|         bool, str]: | ||||
|         """重新实现:将原始exe作为资源嵌入验证器exe中""" | ||||
|         try: | ||||
|             # 读取原始exe内容(作为资源) | ||||
|             with open(source_path, 'rb') as f: | ||||
|                 original_content = f.read() | ||||
| 
 | ||||
|             # 计算原始exe的哈希(用于验证完整性) | ||||
|             file_hash = hashlib.sha256(original_content).hexdigest() | ||||
| 
 | ||||
|             # 创建资源数据(原始exe + 配置信息) | ||||
|             resource_data = { | ||||
|                 'original_exe': original_content.hex(),  # 十六进制字符串 | ||||
|                 'original_size': len(original_content), | ||||
|                 'file_hash': file_hash, | ||||
|                 'db_config': db_config, | ||||
|                 'encryption_time': int(time.time()) | ||||
|             } | ||||
| 
 | ||||
|             # 将资源数据写入Python文件(作为字符串常量) | ||||
|             resource_code = json.dumps(resource_data, separators=(',', ':')) | ||||
| 
 | ||||
|             # 读取验证器模板(validator_wrapper.py) | ||||
|             # with open(validator_path, 'r', encoding='utf-8') as f: | ||||
|             # 读取验证器模板 | ||||
|             with open(validator_path, 'r', encoding='utf-8', errors='ignore') as f: | ||||
|                 validator_template = f.read() | ||||
| 
 | ||||
|             # 构造资源数据(确保是合法 JSON) | ||||
|             resource_data = { | ||||
|                 'original_exe': original_content.hex(), | ||||
|                 'original_size': len(original_content), | ||||
|                 'file_hash': file_hash, | ||||
|                 'db_config': db_config, | ||||
|             } | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
|             # 插入资源数据到验证器模板中 | ||||
|             final_validator_code = validator_template.replace( | ||||
|                 '# RESOURCE_DATA_PLACEHOLDER\nRESOURCE_DATA = None', | ||||
|                 f'RESOURCE_DATA = {json.dumps(resource_data)}' | ||||
|             ) | ||||
| 
 | ||||
| 
 | ||||
|             print("插入后的代码片段:") | ||||
|             print(final_validator_code[:500])  # 打印前500字符 | ||||
| 
 | ||||
|             # 写入临时文件 | ||||
|             temp_validator_py = os.path.join(tempfile.gettempdir(), 'temp_validator.py') | ||||
|             with open(temp_validator_py, 'w', encoding='utf-8') as f: | ||||
|                 f.write(final_validator_code) | ||||
| 
 | ||||
|             # 调试:检查是否插入成功 | ||||
|             with open(temp_validator_py, 'r', encoding='utf-8') as f: | ||||
|                 content = f.read() | ||||
|                 if 'RESOURCE_DATA = {"original_exe"' in content: | ||||
|                     print("✅ 资源数据已成功插入") | ||||
|                 else: | ||||
|                     print("❌ 资源数据未插入,请检查替换逻辑") | ||||
| 
 | ||||
|             # 使用PyInstaller将验证器编译为exe | ||||
|             success, msg = self.compile_to_exe(temp_validator_py, output_path) | ||||
|             if not success: | ||||
|                 return False, msg | ||||
| 
 | ||||
|             # 清理临时文件 | ||||
|             if os.path.exists(temp_validator_py): | ||||
|                 os.remove(temp_validator_py) | ||||
| 
 | ||||
|             return True, f"壳程序编译成功:{output_path}" | ||||
| 
 | ||||
|         except Exception as e: | ||||
|             return False, f"套壳失败:{str(e)}" | ||||
|      | ||||
|     def _create_encryption_header(self, original_size: int, db_config: dict, file_hash: str) -> bytes: | ||||
|         """创建加密文件头信息""" | ||||
|         header_data = { | ||||
|             'version': '2.0', | ||||
|             'original_size': original_size, | ||||
|             'encryption_time': int(time.time()), | ||||
|             'compression_level': 6, | ||||
|             'file_hash': file_hash, | ||||
|             'db_config': db_config | ||||
|         } | ||||
|         return json.dumps(header_data).encode('utf-8').ljust(256, b'\x00') | ||||
|      | ||||
|     def _simple_encrypt(self, data: bytes) -> bytes: | ||||
|         """简化版XOR加密""" | ||||
|         key = b'EXEProtector#2024' | ||||
|         encrypted = bytearray(data) | ||||
|         key_len = len(key) | ||||
|          | ||||
|         for i in range(len(encrypted)): | ||||
|             encrypted[i] ^= key[i % key_len] | ||||
|              | ||||
|         return bytes(encrypted) | ||||
| 
 | ||||
|     def compile_to_exe(self, python_file: str, output_path: str = None) -> Tuple[bool, str]: | ||||
|         """尝试将Python文件编译为EXE""" | ||||
|         try: | ||||
|             # 确保Python文件存在 | ||||
|             py_file = python_file.replace('.exe', '.py') | ||||
|             if not os.path.exists(py_file): | ||||
|                 return False, f"Python源文件不存在: {py_file}" | ||||
| 
 | ||||
|             print(f"Debug: 开始编译 {py_file} 到 {python_file}") | ||||
| 
 | ||||
|             # 检查PyInstaller | ||||
|             try: | ||||
|                 result = subprocess.run(['pyinstaller', '--version'], | ||||
|                                         capture_output=True, text=True, timeout=10) | ||||
|                 if result.returncode != 0: | ||||
|                     return self._fallback_to_python_file(py_file, python_file) | ||||
|             except (subprocess.TimeoutExpired, FileNotFoundError): | ||||
|                 return self._fallback_to_python_file(py_file, python_file) | ||||
| 
 | ||||
|             # 获取文件信息 | ||||
|             file_dir = os.path.dirname(os.path.abspath(py_file)) | ||||
|             exe_name = os.path.basename(python_file).replace('.exe', '') | ||||
| 
 | ||||
|             # 切换到文件目录 | ||||
|             original_cwd = os.getcwd() | ||||
|             os.chdir(file_dir) | ||||
| 
 | ||||
|             try: | ||||
|                 # 创建规范文件内容 | ||||
|                 spec_content = self._create_spec_content(py_file, exe_name) | ||||
|                 spec_file = f'{exe_name}.spec' | ||||
| 
 | ||||
|                 with open(spec_file, 'w', encoding='utf-8') as f: | ||||
|                     f.write(spec_content) | ||||
| 
 | ||||
|                 print(f"Debug: 创建规范文件: {spec_file}") | ||||
| 
 | ||||
|                 # 使用规范文件编译 | ||||
|                 cmd = ['pyinstaller', '--clean', '--noconfirm', spec_file] | ||||
| 
 | ||||
|                 print(f"Debug: 执行编译命令: {' '.join(cmd)}") | ||||
| 
 | ||||
|                 # 执行编译 | ||||
|                 result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) | ||||
| 
 | ||||
|                 if result.returncode == 0: | ||||
|                     # 检查输出文件 | ||||
|                     output_exe = os.path.join('dist', f'{exe_name}.exe') | ||||
|                     if os.path.exists(output_exe): | ||||
|                         # 移动到目标位置 | ||||
|                         target_path = os.path.basename(python_file) | ||||
|                         final_output = output_path or os.path.basename(python_file).replace('.py', '.exe') | ||||
|                         os.makedirs(os.path.dirname(final_output), exist_ok=True) | ||||
|                         if os.path.exists(final_output): | ||||
|                             os.remove(final_output) | ||||
|                         shutil.move(output_exe, final_output) | ||||
|                         print(f"Debug: 编译成功,输出文件: {final_output}") | ||||
|                         return True, "EXE编译成功" | ||||
|                     else: | ||||
|                         print(f"Debug: 找不到输出文件: {output_exe}") | ||||
|                         return False, "编译完成但找不到输出文件" | ||||
|                 else: | ||||
|                     # 编译失败,保留Python文件 | ||||
|                     error_info = result.stderr[:500] if result.stderr else result.stdout[:500] | ||||
|                     print(f"Debug: 编译失败: {error_info}") | ||||
|                     return self._fallback_to_python_file(py_file, python_file, f"编译失败: {error_info}") | ||||
| 
 | ||||
|             finally: | ||||
|                 os.chdir(original_cwd) | ||||
| 
 | ||||
|         except subprocess.TimeoutExpired: | ||||
|             print("Debug: 编译超时") | ||||
|             return self._fallback_to_python_file(py_file, python_file, "编译超时") | ||||
|         except Exception as e: | ||||
|             print(f"Debug: 编译过程异常: {e}") | ||||
|             return False, f"编译过程异常: {str(e)}" | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
|     def _create_spec_content(self, py_file: str, exe_name: str) -> str: | ||||
|         """创建PyInstaller规范文件内容""" | ||||
|         return f'''# -*- mode: python ; coding: utf-8 -*- | ||||
| 
 | ||||
| block_cipher = None | ||||
| 
 | ||||
| a = Analysis( | ||||
|     ['{os.path.basename(py_file)}'], | ||||
|     pathex=[], | ||||
|     binaries=[], | ||||
|     datas=[], | ||||
|     hiddenimports=[ | ||||
|         'mysql.connector', | ||||
|         'cryptography.fernet', | ||||
|         'tkinter', | ||||
|         'tkinter.messagebox', | ||||
|         'tkinter.scrolledtext', | ||||
|         'hashlib', | ||||
|         'subprocess', | ||||
|         'tempfile', | ||||
|         'threading', | ||||
|         'atexit', | ||||
|         'psutil', | ||||
|         'ctypes', | ||||
|         'win32api', | ||||
|         'win32con', | ||||
|         'win32security', | ||||
|         'win32process', | ||||
|     ], | ||||
|     hookspath=[], | ||||
|     hooksconfig={{}}, | ||||
|     runtime_hooks=[], | ||||
|     excludes=[], | ||||
|     win_no_prefer_redirects=False, | ||||
|     win_private_assemblies=False, | ||||
|     cipher=block_cipher, | ||||
|     noarchive=False, | ||||
| ) | ||||
| 
 | ||||
| pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher) | ||||
| 
 | ||||
| exe = EXE( | ||||
|     pyz, | ||||
|     a.scripts, | ||||
|     a.binaries, | ||||
|     a.zipfiles, | ||||
|     a.datas, | ||||
|     [], | ||||
|     name='{exe_name}', | ||||
|     debug=False, | ||||
|     bootloader_ignore_signals=False, | ||||
|     strip=False, | ||||
|     upx=False, | ||||
|     upx_exclude=[], | ||||
|     runtime_tmpdir=None, | ||||
|     console=False, | ||||
|     disable_windowed_traceback=False, | ||||
|     argv_emulation=False, | ||||
|     target_arch=None, | ||||
|     codesign_identity=None, | ||||
|     entitlements_file=None, | ||||
|     uac_admin=False, | ||||
|     uac_uiaccess=False, | ||||
| ) | ||||
| ''' | ||||
| 
 | ||||
|     def _fallback_to_python_file(self, py_file: str, target_file: str,  | ||||
|                                 reason: str = "PyInstaller不可用") -> Tuple[bool, str]: | ||||
|         """回退到Python文件""" | ||||
|         try: | ||||
|             # 将.exe改为.py | ||||
|             python_target = target_file.replace('.exe', '.py') | ||||
| 
 | ||||
|             if py_file != python_target: | ||||
|                 if os.path.exists(python_target): | ||||
|                     os.remove(python_target) | ||||
|                 shutil.copy2(py_file, python_target) | ||||
| 
 | ||||
|             message = f"{reason},使用Python文件: {python_target}" | ||||
|             print(f"Debug: {message}") | ||||
|             return True, message | ||||
| 
 | ||||
|         except Exception as e: | ||||
|             error_msg = f"回退失败: {str(e)}" | ||||
|             print(f"Debug: {error_msg}") | ||||
|             return False, error_msg | ||||
| 
 | ||||
|     def _cleanup_build_files(self, exe_name: str): | ||||
|         """清理编译产生的临时文件""" | ||||
|         try: | ||||
|             # 删除build目录 | ||||
|             if os.path.exists('build'): | ||||
|                 shutil.rmtree('build', ignore_errors=True) | ||||
| 
 | ||||
|             # 删除dist目录 | ||||
|             if os.path.exists('dist'): | ||||
|                 shutil.rmtree('dist', ignore_errors=True) | ||||
| 
 | ||||
|             # 删除spec文件 | ||||
|             spec_file = f'{exe_name}.spec' | ||||
|             if os.path.exists(spec_file): | ||||
|                 os.remove(spec_file) | ||||
| 
 | ||||
|             print("Debug: 清理临时文件完成") | ||||
| 
 | ||||
|         except Exception as e: | ||||
|             print(f"Debug: 清理临时文件时出错: {e}") | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
							
								
								
									
										275
									
								
								machine_code.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										275
									
								
								machine_code.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,275 @@ | ||||
| import hashlib | ||||
| import platform | ||||
| import subprocess | ||||
| import sys | ||||
| import uuid | ||||
| 
 | ||||
| 
 | ||||
| def get_windows_machine_code(): | ||||
|     """获取Windows系统的机器码""" | ||||
|     try: | ||||
|         # 尝试获取主板序列号 | ||||
|         try: | ||||
|             wmic_output = subprocess.check_output( | ||||
|                 'wmic baseboard get serialnumber', | ||||
|                 shell=True, | ||||
|                 stderr=subprocess.STDOUT, | ||||
|                 timeout=10 | ||||
|             ).decode().strip() | ||||
| 
 | ||||
|             if "SerialNumber" in wmic_output: | ||||
|                 lines = [line.strip() for line in wmic_output.split("\n") if line.strip()] | ||||
|                 if len(lines) > 1: | ||||
|                     serial = lines[1] | ||||
|                     if serial and serial not in ["To Be Filled By O.E.M.", "Default string", "N/A", ""]: | ||||
|                         return hashlib.md5(serial.encode()).hexdigest()[:16].upper() | ||||
|         except: | ||||
|             pass | ||||
| 
 | ||||
|         # 尝试获取硬盘序列号 | ||||
|         try: | ||||
|             wmic_output = subprocess.check_output( | ||||
|                 'wmic diskdrive get serialnumber', | ||||
|                 shell=True, | ||||
|                 stderr=subprocess.STDOUT, | ||||
|                 timeout=10 | ||||
|             ).decode().strip() | ||||
| 
 | ||||
|             if "SerialNumber" in wmic_output: | ||||
|                 lines = [line.strip() for line in wmic_output.split("\n") if line.strip()] | ||||
|                 for line in lines[1:]:  # 跳过标题行 | ||||
|                     if line and line not in ["", "N/A"]: | ||||
|                         return hashlib.md5(line.encode()).hexdigest()[:16].upper() | ||||
|         except: | ||||
|             pass | ||||
| 
 | ||||
|         # 尝试获取CPU序列号 | ||||
|         try: | ||||
|             wmic_output = subprocess.check_output( | ||||
|                 'wmic cpu get processorid', | ||||
|                 shell=True, | ||||
|                 stderr=subprocess.STDOUT, | ||||
|                 timeout=10 | ||||
|             ).decode().strip() | ||||
| 
 | ||||
|             if "ProcessorId" in wmic_output: | ||||
|                 lines = [line.strip() for line in wmic_output.split("\n") if line.strip()] | ||||
|                 if len(lines) > 1: | ||||
|                     cpu_id = lines[1] | ||||
|                     if cpu_id and cpu_id != "": | ||||
|                         return hashlib.md5(cpu_id.encode()).hexdigest()[:16].upper() | ||||
|         except: | ||||
|             pass | ||||
| 
 | ||||
|         # 尝试获取BIOS序列号 | ||||
|         try: | ||||
|             wmic_output = subprocess.check_output( | ||||
|                 'wmic bios get serialnumber', | ||||
|                 shell=True, | ||||
|                 stderr=subprocess.STDOUT, | ||||
|                 timeout=10 | ||||
|             ).decode().strip() | ||||
| 
 | ||||
|             if "SerialNumber" in wmic_output: | ||||
|                 lines = [line.strip() for line in wmic_output.split("\n") if line.strip()] | ||||
|                 if len(lines) > 1: | ||||
|                     bios_serial = lines[1] | ||||
|                     if bios_serial and bios_serial not in ["To Be Filled By O.E.M.", "Default string", "N/A", ""]: | ||||
|                         return hashlib.md5(bios_serial.encode()).hexdigest()[:16].upper() | ||||
|         except: | ||||
|             pass | ||||
| 
 | ||||
|         # 组合多个系统信息生成唯一标识 | ||||
|         try: | ||||
|             # 获取网卡MAC地址 | ||||
|             mac = uuid.getnode() | ||||
|             mac_str = ':'.join(['{:02x}'.format((mac >> elements) & 0xff) for elements in range(0, 2 * 6, 2)][::-1]) | ||||
| 
 | ||||
|             # 获取计算机名 | ||||
|             computer_name = platform.node() | ||||
| 
 | ||||
|             # 获取系统信息 | ||||
|             system_info = f"{platform.system()}-{platform.release()}-{platform.version()}" | ||||
| 
 | ||||
|             # 组合信息 | ||||
|             combined_info = f"{mac_str}-{computer_name}-{system_info}" | ||||
|             return hashlib.md5(combined_info.encode()).hexdigest()[:16].upper() | ||||
| 
 | ||||
|         except Exception as e: | ||||
|             # 最后的备用方案 | ||||
|             fallback = f"{platform.uname()}-{uuid.getnode()}" | ||||
|             return hashlib.md5(fallback.encode()).hexdigest()[:16].upper() | ||||
| 
 | ||||
|     except Exception as e: | ||||
|         print(f"获取Windows机器码错误: {e}") | ||||
|         # 生成一个基于系统信息的备用哈希 | ||||
|         try: | ||||
|             fallback = f"{platform.node()}-{uuid.getnode()}-{platform.processor()}" | ||||
|             return hashlib.md5(fallback.encode()).hexdigest()[:16].upper() | ||||
|         except: | ||||
|             # 终极备用方案 | ||||
|             import time | ||||
|             fallback = f"FALLBACK-{int(time.time())}-{platform.system()}" | ||||
|             return hashlib.md5(fallback.encode()).hexdigest()[:16].upper() | ||||
| 
 | ||||
| 
 | ||||
| def get_linux_machine_code(): | ||||
|     """获取Linux系统的机器码""" | ||||
|     try: | ||||
|         # 尝试读取machine-id | ||||
|         try: | ||||
|             with open('/etc/machine-id', 'r') as f: | ||||
|                 machine_id = f.read().strip() | ||||
|             if machine_id and len(machine_id) > 10: | ||||
|                 return machine_id[:16].upper() | ||||
|         except: | ||||
|             pass | ||||
| 
 | ||||
|         # 尝试读取/var/lib/dbus/machine-id | ||||
|         try: | ||||
|             with open('/var/lib/dbus/machine-id', 'r') as f: | ||||
|                 machine_id = f.read().strip() | ||||
|             if machine_id and len(machine_id) > 10: | ||||
|                 return machine_id[:16].upper() | ||||
|         except: | ||||
|             pass | ||||
| 
 | ||||
|         # 尝试获取主板信息 | ||||
|         try: | ||||
|             with open('/sys/class/dmi/id/board_serial', 'r') as f: | ||||
|                 board_serial = f.read().strip() | ||||
|             if board_serial and board_serial not in ["", "N/A", "To be filled by O.E.M."]: | ||||
|                 return hashlib.md5(board_serial.encode()).hexdigest()[:16].upper() | ||||
|         except: | ||||
|             pass | ||||
| 
 | ||||
|         # 尝试获取产品UUID | ||||
|         try: | ||||
|             with open('/sys/class/dmi/id/product_uuid', 'r') as f: | ||||
|                 product_uuid = f.read().strip() | ||||
|             if product_uuid and product_uuid != "": | ||||
|                 return hashlib.md5(product_uuid.encode()).hexdigest()[:16].upper() | ||||
|         except: | ||||
|             pass | ||||
| 
 | ||||
|         # 获取MAC地址和主机名组合 | ||||
|         mac = uuid.getnode() | ||||
|         mac_str = ':'.join(['{:02x}'.format((mac >> elements) & 0xff) for elements in range(0, 2 * 6, 2)][::-1]) | ||||
|         hostname = platform.node() | ||||
|         combined = f"{mac_str}-{hostname}" | ||||
|         return hashlib.md5(combined.encode()).hexdigest()[:16].upper() | ||||
| 
 | ||||
|     except Exception as e: | ||||
|         print(f"获取Linux机器码错误: {e}") | ||||
|         # 备用方案 | ||||
|         system_info = f"{platform.node()}-{uuid.getnode()}-{platform.machine()}" | ||||
|         return hashlib.md5(system_info.encode()).hexdigest()[:16].upper() | ||||
| 
 | ||||
| 
 | ||||
| def get_mac_machine_code(): | ||||
|     """获取macOS系统的机器码""" | ||||
|     try: | ||||
|         # 尝试获取硬件UUID | ||||
|         try: | ||||
|             result = subprocess.check_output( | ||||
|                 ['system_profiler', 'SPHardwareDataType'], | ||||
|                 stderr=subprocess.STDOUT, | ||||
|                 timeout=10 | ||||
|             ).decode().strip() | ||||
| 
 | ||||
|             for line in result.split('\n'): | ||||
|                 if 'Hardware UUID' in line: | ||||
|                     uuid_part = line.split(':')[1].strip() | ||||
|                     return hashlib.md5(uuid_part.encode()).hexdigest()[:16].upper() | ||||
|         except: | ||||
|             pass | ||||
| 
 | ||||
|         # 尝试获取序列号 | ||||
|         try: | ||||
|             serial = subprocess.check_output( | ||||
|                 ['system_profiler', 'SPHardwareDataType', '|', 'grep', 'Serial'], | ||||
|                 shell=True, | ||||
|                 stderr=subprocess.STDOUT, | ||||
|                 timeout=10 | ||||
|             ).decode().strip() | ||||
| 
 | ||||
|             if serial: | ||||
|                 serial_number = serial.split(':')[1].strip() | ||||
|                 return hashlib.md5(serial_number.encode()).hexdigest()[:16].upper() | ||||
|         except: | ||||
|             pass | ||||
| 
 | ||||
|         # 备用方案 | ||||
|         mac = uuid.getnode() | ||||
|         hostname = platform.node() | ||||
|         combined = f"{mac}-{hostname}-{platform.machine()}" | ||||
|         return hashlib.md5(combined.encode()).hexdigest()[:16].upper() | ||||
| 
 | ||||
|     except Exception as e: | ||||
|         print(f"获取macOS机器码错误: {e}") | ||||
|         # 备用方案 | ||||
|         system_info = f"{platform.node()}-{uuid.getnode()}-{platform.machine()}" | ||||
|         return hashlib.md5(system_info.encode()).hexdigest()[:16].upper() | ||||
| 
 | ||||
| 
 | ||||
| def get_machine_code(): | ||||
|     """获取当前系统的机器码""" | ||||
|     try: | ||||
|         system = platform.system() | ||||
| 
 | ||||
|         if system == "Windows": | ||||
|             return get_windows_machine_code() | ||||
|         elif system == "Linux": | ||||
|             return get_linux_machine_code() | ||||
|         elif system == "Darwin":  # macOS | ||||
|             return get_mac_machine_code() | ||||
|         else: | ||||
|             # 未知系统,使用通用方法 | ||||
|             system_info = f"{platform.node()}-{uuid.getnode()}-{platform.processor()}-{platform.machine()}" | ||||
|             return hashlib.md5(system_info.encode()).hexdigest()[:16].upper() | ||||
| 
 | ||||
|     except Exception as e: | ||||
|         print(f"获取机器码失败: {e}") | ||||
|         # 终极备用方案 | ||||
|         try: | ||||
|             fallback = f"{platform.system()}-{uuid.getnode()}-{platform.node()}" | ||||
|             return hashlib.md5(fallback.encode()).hexdigest()[:16].upper() | ||||
|         except: | ||||
|             import time | ||||
|             ultimate_fallback = f"MACHINE-{int(time.time())}" | ||||
|             return hashlib.md5(ultimate_fallback.encode()).hexdigest()[:16].upper() | ||||
| 
 | ||||
| 
 | ||||
| def format_machine_code(machine_code): | ||||
|     """格式化机器码为更易读的格式""" | ||||
|     if len(machine_code) >= 16: | ||||
|         # 将16位机器码格式化为 XXXX-XXXX-XXXX-XXXX | ||||
|         return f"{machine_code[:4]}-{machine_code[4:8]}-{machine_code[8:12]}-{machine_code[12:16]}" | ||||
|     else: | ||||
|         return machine_code | ||||
| 
 | ||||
| 
 | ||||
| def verify_machine_code(stored_code, current_code): | ||||
|     """验证机器码是否匹配""" | ||||
|     # 移除格式化字符进行比较 | ||||
|     stored_clean = stored_code.replace('-', '').upper() | ||||
|     current_clean = current_code.replace('-', '').upper() | ||||
|     return stored_clean == current_clean | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|     print("机器码生成器测试") | ||||
|     print("-" * 30) | ||||
| 
 | ||||
|     machine_code = get_machine_code() | ||||
|     formatted_code = format_machine_code(machine_code) | ||||
| 
 | ||||
|     print(f"系统: {platform.system()} {platform.release()}") | ||||
|     print(f"原始机器码: {machine_code}") | ||||
|     print(f"格式化机器码: {formatted_code}") | ||||
|     print(f"机器码长度: {len(machine_code)}") | ||||
| 
 | ||||
|     # 测试验证功能 | ||||
|     print(f"\n验证测试: {verify_machine_code(formatted_code, machine_code)}") | ||||
| 
 | ||||
|     input("\n按回车键退出...") | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user