310 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			310 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|  | import os | |||
|  | import shutil | |||
|  | import subprocess | |||
|  | import tempfile | |||
|  | import json | |||
|  | import struct | |||
|  | import hashlib | |||
|  | import zlib | |||
|  | from typing import Tuple, Optional | |||
|  | import time | |||
|  | 
 | |||
|  | class EXEEncryptor: | |||
|  |     """EXE文件加密器类 - 提供EXE文件加密、编译和验证功能""" | |||
|  |      | |||
|  |     def __init__(self): | |||
|  |         """初始化加密器""" | |||
|  |         pass | |||
|  | 
 | |||
|  |     # encryptor.py | |||
|  | 
 | |||
|  |     def encrypt_file(self, source_path: str, output_path: str, validator_path: str, db_config: dict) -> Tuple[ | |||
|  |         bool, str]: | |||
|  |         """重新实现:将原始exe作为资源嵌入验证器exe中""" | |||
|  |         try: | |||
|  |             # 读取原始exe内容(作为资源) | |||
|  |             with open(source_path, 'rb') as f: | |||
|  |                 original_content = f.read() | |||
|  | 
 | |||
|  |             # 计算原始exe的哈希(用于验证完整性) | |||
|  |             file_hash = hashlib.sha256(original_content).hexdigest() | |||
|  | 
 | |||
|  |             # 创建资源数据(原始exe + 配置信息) | |||
|  |             resource_data = { | |||
|  |                 'original_exe': original_content.hex(),  # 十六进制字符串 | |||
|  |                 'original_size': len(original_content), | |||
|  |                 'file_hash': file_hash, | |||
|  |                 'db_config': db_config, | |||
|  |                 'encryption_time': int(time.time()) | |||
|  |             } | |||
|  | 
 | |||
|  |             # 将资源数据写入Python文件(作为字符串常量) | |||
|  |             resource_code = json.dumps(resource_data, separators=(',', ':')) | |||
|  | 
 | |||
|  |             # 读取验证器模板(validator_wrapper.py) | |||
|  |             # with open(validator_path, 'r', encoding='utf-8') as f: | |||
|  |             # 读取验证器模板 | |||
|  |             with open(validator_path, 'r', encoding='utf-8', errors='ignore') as f: | |||
|  |                 validator_template = f.read() | |||
|  | 
 | |||
|  |             # 构造资源数据(确保是合法 JSON) | |||
|  |             resource_data = { | |||
|  |                 'original_exe': original_content.hex(), | |||
|  |                 'original_size': len(original_content), | |||
|  |                 'file_hash': file_hash, | |||
|  |                 'db_config': db_config, | |||
|  |             } | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  |             # 插入资源数据到验证器模板中 | |||
|  |             final_validator_code = validator_template.replace( | |||
|  |                 '# RESOURCE_DATA_PLACEHOLDER\nRESOURCE_DATA = None', | |||
|  |                 f'RESOURCE_DATA = {json.dumps(resource_data)}' | |||
|  |             ) | |||
|  | 
 | |||
|  | 
 | |||
|  |             print("插入后的代码片段:") | |||
|  |             print(final_validator_code[:500])  # 打印前500字符 | |||
|  | 
 | |||
|  |             # 写入临时文件 | |||
|  |             temp_validator_py = os.path.join(tempfile.gettempdir(), 'temp_validator.py') | |||
|  |             with open(temp_validator_py, 'w', encoding='utf-8') as f: | |||
|  |                 f.write(final_validator_code) | |||
|  | 
 | |||
|  |             # 调试:检查是否插入成功 | |||
|  |             with open(temp_validator_py, 'r', encoding='utf-8') as f: | |||
|  |                 content = f.read() | |||
|  |                 if 'RESOURCE_DATA = {"original_exe"' in content: | |||
|  |                     print("✅ 资源数据已成功插入") | |||
|  |                 else: | |||
|  |                     print("❌ 资源数据未插入,请检查替换逻辑") | |||
|  | 
 | |||
|  |             # 使用PyInstaller将验证器编译为exe | |||
|  |             success, msg = self.compile_to_exe(temp_validator_py, output_path) | |||
|  |             if not success: | |||
|  |                 return False, msg | |||
|  | 
 | |||
|  |             # 清理临时文件 | |||
|  |             if os.path.exists(temp_validator_py): | |||
|  |                 os.remove(temp_validator_py) | |||
|  | 
 | |||
|  |             return True, f"壳程序编译成功:{output_path}" | |||
|  | 
 | |||
|  |         except Exception as e: | |||
|  |             return False, f"套壳失败:{str(e)}" | |||
|  |      | |||
|  |     def _create_encryption_header(self, original_size: int, db_config: dict, file_hash: str) -> bytes: | |||
|  |         """创建加密文件头信息""" | |||
|  |         header_data = { | |||
|  |             'version': '2.0', | |||
|  |             'original_size': original_size, | |||
|  |             'encryption_time': int(time.time()), | |||
|  |             'compression_level': 6, | |||
|  |             'file_hash': file_hash, | |||
|  |             'db_config': db_config | |||
|  |         } | |||
|  |         return json.dumps(header_data).encode('utf-8').ljust(256, b'\x00') | |||
|  |      | |||
|  |     def _simple_encrypt(self, data: bytes) -> bytes: | |||
|  |         """简化版XOR加密""" | |||
|  |         key = b'EXEProtector#2024' | |||
|  |         encrypted = bytearray(data) | |||
|  |         key_len = len(key) | |||
|  |          | |||
|  |         for i in range(len(encrypted)): | |||
|  |             encrypted[i] ^= key[i % key_len] | |||
|  |              | |||
|  |         return bytes(encrypted) | |||
|  | 
 | |||
|  |     def compile_to_exe(self, python_file: str, output_path: str = None) -> Tuple[bool, str]: | |||
|  |         """尝试将Python文件编译为EXE""" | |||
|  |         try: | |||
|  |             # 确保Python文件存在 | |||
|  |             py_file = python_file.replace('.exe', '.py') | |||
|  |             if not os.path.exists(py_file): | |||
|  |                 return False, f"Python源文件不存在: {py_file}" | |||
|  | 
 | |||
|  |             print(f"Debug: 开始编译 {py_file} 到 {python_file}") | |||
|  | 
 | |||
|  |             # 检查PyInstaller | |||
|  |             try: | |||
|  |                 result = subprocess.run(['pyinstaller', '--version'], | |||
|  |                                         capture_output=True, text=True, timeout=10) | |||
|  |                 if result.returncode != 0: | |||
|  |                     return self._fallback_to_python_file(py_file, python_file) | |||
|  |             except (subprocess.TimeoutExpired, FileNotFoundError): | |||
|  |                 return self._fallback_to_python_file(py_file, python_file) | |||
|  | 
 | |||
|  |             # 获取文件信息 | |||
|  |             file_dir = os.path.dirname(os.path.abspath(py_file)) | |||
|  |             exe_name = os.path.basename(python_file).replace('.exe', '') | |||
|  | 
 | |||
|  |             # 切换到文件目录 | |||
|  |             original_cwd = os.getcwd() | |||
|  |             os.chdir(file_dir) | |||
|  | 
 | |||
|  |             try: | |||
|  |                 # 创建规范文件内容 | |||
|  |                 spec_content = self._create_spec_content(py_file, exe_name) | |||
|  |                 spec_file = f'{exe_name}.spec' | |||
|  | 
 | |||
|  |                 with open(spec_file, 'w', encoding='utf-8') as f: | |||
|  |                     f.write(spec_content) | |||
|  | 
 | |||
|  |                 print(f"Debug: 创建规范文件: {spec_file}") | |||
|  | 
 | |||
|  |                 # 使用规范文件编译 | |||
|  |                 cmd = ['pyinstaller', '--clean', '--noconfirm', spec_file] | |||
|  | 
 | |||
|  |                 print(f"Debug: 执行编译命令: {' '.join(cmd)}") | |||
|  | 
 | |||
|  |                 # 执行编译 | |||
|  |                 result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) | |||
|  | 
 | |||
|  |                 if result.returncode == 0: | |||
|  |                     # 检查输出文件 | |||
|  |                     output_exe = os.path.join('dist', f'{exe_name}.exe') | |||
|  |                     if os.path.exists(output_exe): | |||
|  |                         # 移动到目标位置 | |||
|  |                         target_path = os.path.basename(python_file) | |||
|  |                         final_output = output_path or os.path.basename(python_file).replace('.py', '.exe') | |||
|  |                         os.makedirs(os.path.dirname(final_output), exist_ok=True) | |||
|  |                         if os.path.exists(final_output): | |||
|  |                             os.remove(final_output) | |||
|  |                         shutil.move(output_exe, final_output) | |||
|  |                         print(f"Debug: 编译成功,输出文件: {final_output}") | |||
|  |                         return True, "EXE编译成功" | |||
|  |                     else: | |||
|  |                         print(f"Debug: 找不到输出文件: {output_exe}") | |||
|  |                         return False, "编译完成但找不到输出文件" | |||
|  |                 else: | |||
|  |                     # 编译失败,保留Python文件 | |||
|  |                     error_info = result.stderr[:500] if result.stderr else result.stdout[:500] | |||
|  |                     print(f"Debug: 编译失败: {error_info}") | |||
|  |                     return self._fallback_to_python_file(py_file, python_file, f"编译失败: {error_info}") | |||
|  | 
 | |||
|  |             finally: | |||
|  |                 os.chdir(original_cwd) | |||
|  | 
 | |||
|  |         except subprocess.TimeoutExpired: | |||
|  |             print("Debug: 编译超时") | |||
|  |             return self._fallback_to_python_file(py_file, python_file, "编译超时") | |||
|  |         except Exception as e: | |||
|  |             print(f"Debug: 编译过程异常: {e}") | |||
|  |             return False, f"编译过程异常: {str(e)}" | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  |     def _create_spec_content(self, py_file: str, exe_name: str) -> str: | |||
|  |         """创建PyInstaller规范文件内容""" | |||
|  |         return f'''# -*- mode: python ; coding: utf-8 -*-
 | |||
|  | 
 | |||
|  | block_cipher = None | |||
|  | 
 | |||
|  | a = Analysis( | |||
|  |     ['{os.path.basename(py_file)}'], | |||
|  |     pathex=[], | |||
|  |     binaries=[], | |||
|  |     datas=[], | |||
|  |     hiddenimports=[ | |||
|  |         'mysql.connector', | |||
|  |         'cryptography.fernet', | |||
|  |         'tkinter', | |||
|  |         'tkinter.messagebox', | |||
|  |         'tkinter.scrolledtext', | |||
|  |         'hashlib', | |||
|  |         'subprocess', | |||
|  |         'tempfile', | |||
|  |         'threading', | |||
|  |         'atexit', | |||
|  |         'psutil', | |||
|  |         'ctypes', | |||
|  |         'win32api', | |||
|  |         'win32con', | |||
|  |         'win32security', | |||
|  |         'win32process', | |||
|  |     ], | |||
|  |     hookspath=[], | |||
|  |     hooksconfig={{}}, | |||
|  |     runtime_hooks=[], | |||
|  |     excludes=[], | |||
|  |     win_no_prefer_redirects=False, | |||
|  |     win_private_assemblies=False, | |||
|  |     cipher=block_cipher, | |||
|  |     noarchive=False, | |||
|  | ) | |||
|  | 
 | |||
|  | pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher) | |||
|  | 
 | |||
|  | exe = EXE( | |||
|  |     pyz, | |||
|  |     a.scripts, | |||
|  |     a.binaries, | |||
|  |     a.zipfiles, | |||
|  |     a.datas, | |||
|  |     [], | |||
|  |     name='{exe_name}', | |||
|  |     debug=False, | |||
|  |     bootloader_ignore_signals=False, | |||
|  |     strip=False, | |||
|  |     upx=False, | |||
|  |     upx_exclude=[], | |||
|  |     runtime_tmpdir=None, | |||
|  |     console=False, | |||
|  |     disable_windowed_traceback=False, | |||
|  |     argv_emulation=False, | |||
|  |     target_arch=None, | |||
|  |     codesign_identity=None, | |||
|  |     entitlements_file=None, | |||
|  |     uac_admin=False, | |||
|  |     uac_uiaccess=False, | |||
|  | ) | |||
|  | '''
 | |||
|  | 
 | |||
|  |     def _fallback_to_python_file(self, py_file: str, target_file: str,  | |||
|  |                                 reason: str = "PyInstaller不可用") -> Tuple[bool, str]: | |||
|  |         """回退到Python文件""" | |||
|  |         try: | |||
|  |             # 将.exe改为.py | |||
|  |             python_target = target_file.replace('.exe', '.py') | |||
|  | 
 | |||
|  |             if py_file != python_target: | |||
|  |                 if os.path.exists(python_target): | |||
|  |                     os.remove(python_target) | |||
|  |                 shutil.copy2(py_file, python_target) | |||
|  | 
 | |||
|  |             message = f"{reason},使用Python文件: {python_target}" | |||
|  |             print(f"Debug: {message}") | |||
|  |             return True, message | |||
|  | 
 | |||
|  |         except Exception as e: | |||
|  |             error_msg = f"回退失败: {str(e)}" | |||
|  |             print(f"Debug: {error_msg}") | |||
|  |             return False, error_msg | |||
|  | 
 | |||
|  |     def _cleanup_build_files(self, exe_name: str): | |||
|  |         """清理编译产生的临时文件""" | |||
|  |         try: | |||
|  |             # 删除build目录 | |||
|  |             if os.path.exists('build'): | |||
|  |                 shutil.rmtree('build', ignore_errors=True) | |||
|  | 
 | |||
|  |             # 删除dist目录 | |||
|  |             if os.path.exists('dist'): | |||
|  |                 shutil.rmtree('dist', ignore_errors=True) | |||
|  | 
 | |||
|  |             # 删除spec文件 | |||
|  |             spec_file = f'{exe_name}.spec' | |||
|  |             if os.path.exists(spec_file): | |||
|  |                 os.remove(spec_file) | |||
|  | 
 | |||
|  |             print("Debug: 清理临时文件完成") | |||
|  | 
 | |||
|  |         except Exception as e: | |||
|  |             print(f"Debug: 清理临时文件时出错: {e}") | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 | |||
|  | 
 |