Exeprotector/validator_wrapper.py
2025-09-05 11:52:53 +08:00

657 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
EXE Wrapper Validator - 外层验证程序
用于验证卡密后启动原始exe文件包含防护措施
(已整合:自动本地缓存验证 + UI 美化 + 修复本地缓存解密)
"""
import os
import sys
import struct
import json
import hashlib
import tempfile
import subprocess
import mysql.connector
from datetime import datetime
import tkinter as tk
from tkinter import messagebox, ttk
import threading
import time
import psutil
import ctypes
import win32con
import os, json, base64
from cryptography.fernet import Fernet # pip install cryptography
from machine_code import get_machine_code
LICENSE_FILE = os.path.join(tempfile.gettempdir(), '.lic_cache')
def _get_fernet() -> Fernet:
"""用机器码作为密钥,保证同一机器才能解密"""
machine_key = get_machine_code().encode()[:32].ljust(32, b'0')
return Fernet(base64.urlsafe_b64encode(machine_key))
def save_license(key: str):
"""加密保存卡密"""
data = {'key': key, 'machine': get_machine_code()}
with open(LICENSE_FILE, 'wb') as f:
f.write(_get_fernet().encrypt(json.dumps(data).encode()))
def load_license() -> str | None:
"""解密读取卡密,若文件不存在/被篡改返回 None"""
try:
with open(LICENSE_FILE, 'rb') as f:
data = json.loads(_get_fernet().decrypt(f.read()).decode())
return data['key'] if data.get('machine') == get_machine_code() else None
except Exception:
return None
# RESOURCE_DATA_PLACEHOLDER
RESOURCE_DATA = None
class EXEWrapperValidator:
"""EXE外层验证器 - 验证卡密后启动原始程序"""
def __init__(self):
self.temp_dir = None
self.original_exe_path = None
self.process_handle = None
self.anti_debug_enabled = True
self.anti_vm_enabled = True
self.process_monitoring = True
def show_license_dialog(self):
"""显示许可证激活对话框(美化版)"""
def style_button(btn, bg_color, hover_color):
def on_enter(e): btn['bg'] = hover_color
def on_leave(e): btn['bg'] = bg_color
btn.bind("<Enter>", on_enter)
btn.bind("<Leave>", on_leave)
def on_activate():
key = key_entry.get().strip()
if not key:
messagebox.showerror("错误", "请输入激活码")
return
activate_btn.config(state=tk.DISABLED, text="验证中...")
status_label.config(text="正在验证激活码...", fg="blue")
root.update()
def validate_thread():
success, msg = self.validate_license(key, self.get_machine_code())
root.after(0, lambda: self.handle_validation_result(success, msg, root, key))
threading.Thread(target=validate_thread, daemon=True).start()
machine_code = self.get_machine_code()
root = tk.Tk()
root.title("软件许可证验证")
root.geometry("520x460")
root.configure(bg="#ffffff")
root.resizable(False, False)
# 居中窗口
root.update_idletasks()
x = (root.winfo_screenwidth() - root.winfo_width()) // 2
y = (root.winfo_screenheight() - root.winfo_height()) // 2
root.geometry(f"520x460+{x}+{y}")
# 标题
tk.Label(root, text="🔐 软件许可证验证",
font=("Microsoft YaHei", 18, "bold"),
bg="#ffffff", fg="#2c3e50").pack(pady=12)
# 机器码框架
machine_frame = tk.Frame(root, bg="#ffffff")
machine_frame.pack(fill=tk.X, padx=30, pady=6)
tk.Label(machine_frame, text="机器码:", bg="#ffffff",
font=("Microsoft YaHei", 10, "bold")).pack(anchor=tk.W)
machine_entry = tk.Entry(machine_frame, state="readonly", width=45,
font=("Consolas", 10), bg="#f2f4f7", relief=tk.SOLID, bd=1)
machine_entry.insert(0, machine_code)
machine_entry.pack(fill=tk.X, pady=6)
# 复制机器码按钮
copy_btn = tk.Button(machine_frame, text="复制机器码",
command=lambda: self.copy_to_clipboard(machine_code),
bg="#3498db", fg="white", font=("Microsoft YaHei", 9),
relief=tk.FLAT, padx=8, cursor="hand2")
style_button(copy_btn, "#3498db", "#5dade2")
copy_btn.pack(anchor=tk.E, pady=2)
# 激活码框架
key_frame = tk.Frame(root, bg="#ffffff")
key_frame.pack(fill=tk.X, padx=30, pady=10)
tk.Label(key_frame, text="激活码:", bg="#ffffff",
font=("Microsoft YaHei", 10, "bold")).pack(anchor=tk.W)
key_entry = tk.Entry(key_frame, width=40, font=("Consolas", 12),
bg="#fbfbfb", relief=tk.SOLID, bd=2,
highlightcolor="#3498db", highlightthickness=2)
key_entry.pack(fill=tk.X, pady=6)
# 说明文字
info_label = tk.Label(root, text="请输入激活码格式XXXXX-XXXXX-XXXXX-XXXXX",
bg="#ffffff", fg="#7f8c8d", font=("Microsoft YaHei", 9))
info_label.pack(pady=4)
status_label = tk.Label(root, text="", bg="#ffffff", font=("Microsoft YaHei", 9))
status_label.pack(pady=6)
# 按钮框架
button_frame = tk.Frame(root, bg="#ffffff")
button_frame.pack(fill=tk.X, padx=30, pady=12)
activate_btn = tk.Button(button_frame, text="验证并启动", command=on_activate,
bg="#27ae60", fg="white", padx=26, pady=8,
font=("Microsoft YaHei", 11, "bold"),
relief=tk.FLAT, cursor="hand2")
style_button(activate_btn, "#27ae60", "#2ecc71")
activate_btn.pack(side=tk.LEFT, padx=8)
cancel_btn = tk.Button(button_frame, text="退出",
command=lambda: [root.destroy(), sys.exit(0)],
bg="#e74c3c", fg="white", padx=26, pady=8,
font=("Microsoft YaHei", 11, "bold"),
relief=tk.FLAT, cursor="hand2")
style_button(cancel_btn, "#e74c3c", "#ff6b6b")
cancel_btn.pack(side=tk.RIGHT, padx=8)
key_entry.focus()
root.bind('<Return>', lambda e: on_activate())
root.bind('<Escape>', lambda e: [root.destroy(), sys.exit(0)])
root.protocol("WM_DELETE_WINDOW", lambda: [root.destroy(), sys.exit(0)])
root.mainloop()
def handle_validation_result(self, success, msg, root,key):
"""处理验证结果"""
if success:
messagebox.showinfo("验证成功", "激活码验证成功!正在启动程序...")
save_license(key) # <== 新增
root.destroy()
# 提取并运行原始程序
extracted_path = self.extract_original_program()
if extracted_path:
self.launch_program_with_protection(extracted_path)
else:
messagebox.showerror("错误", "无法提取原始程序")
sys.exit(1)
else:
messagebox.showerror("验证失败", msg)
# 重置按钮状态
for widget in root.winfo_children():
if isinstance(widget, tk.Frame):
for child in widget.winfo_children():
if isinstance(child, tk.Button) and "验证" in child.cget("text"):
child.config(state=tk.NORMAL, text="验证并启动")
break
def copy_to_clipboard(self, text):
"""复制文本到剪贴板"""
try:
import pyperclip
pyperclip.copy(text)
messagebox.showinfo("成功", "机器码已复制到剪贴板")
except ImportError:
# 使用tkinter的回退方法
root = tk.Tk()
root.withdraw()
root.clipboard_clear()
root.clipboard_append(text)
root.update()
root.destroy()
messagebox.showinfo("成功", "机器码已复制到剪贴板")
except Exception as e:
messagebox.showwarning("警告", f"无法复制到剪贴板: {str(e)}")
def get_machine_code(self):
"""生成唯一机器码"""
try:
if os.name == 'nt': # Windows
return self.get_windows_machine_code()
else: # Linux/Mac
return self.get_unix_machine_code()
except Exception as e:
# 回退方法
import platform
import uuid
unique = f"{platform.node()}{uuid.getnode()}{platform.processor()}"
return hashlib.md5(unique.encode("utf-8")).hexdigest()[:16].upper()
def get_windows_machine_code(self):
"""获取Windows机器码"""
try:
# 尝试获取主板序列号
try:
result = subprocess.check_output(
'wmic baseboard get serialnumber',
shell=True, stderr=subprocess.STDOUT
).decode().strip()
if "SerialNumber" in result:
serial = result.split("\n")[1].strip()
if serial and serial != "To Be Filled By O.E.M.":
return hashlib.md5(serial.encode()).hexdigest()[:16].upper()
except:
pass
# 尝试获取CPU ID
try:
result = subprocess.check_output(
'wmic cpu get processorid',
shell=True, stderr=subprocess.STDOUT
).decode().strip()
if "ProcessorId" in result:
cpu_id = result.split("\n")[1].strip()
if cpu_id:
return hashlib.md5(cpu_id.encode()).hexdigest()[:16].upper()
except:
pass
# 回退
import platform
import uuid
unique = f"{platform.node()}{uuid.getnode()}"
return hashlib.md5(unique.encode()).hexdigest()[:16].upper()
except Exception as e:
import platform
import uuid
unique = f"{platform.node()}{uuid.getnode()}"
return hashlib.md5(unique.encode()).hexdigest()[:16].upper()
def get_unix_machine_code(self):
"""获取Unix/Linux机器码"""
try:
import subprocess
import uuid
# 尝试获取MAC地址
mac = uuid.getnode()
mac_str = ':'.join(['{:02x}'.format((mac >> elements) & 0xff)
for elements in range(0, 2 * 6, 2)][::-1])
# 尝试获取machine-id或hostname
try:
with open('/etc/machine-id', 'r') as f:
machine_id = f.read().strip()
unique = f"{mac_str}{machine_id}"
except:
import platform
unique = f"{mac_str}{platform.node()}"
return hashlib.md5(unique.encode()).hexdigest()[:16].upper()
except Exception as e:
import platform
import uuid
unique = f"{platform.node()}{uuid.getnode()}"
return hashlib.md5(unique.encode()).hexdigest()[:16].upper()
def load_resource_data(self):
"""从嵌入的 RESOURCE_DATA 中加载原始EXE和配置"""
try:
if RESOURCE_DATA is None:
return False, "未找到嵌入的资源数据", None
# 提取原始exe内容
original_hex = RESOURCE_DATA['original_exe']
original_content = bytes.fromhex(original_hex)
# 提取配置
config = {
'original_size': RESOURCE_DATA['original_size'],
'file_hash': RESOURCE_DATA['file_hash'],
'db_config': RESOURCE_DATA['db_config']
}
return True, config, original_content
except Exception as e:
return False, f"加载资源失败: {str(e)}", None
def validate_license(self, license_key, machine_code):
"""验证许可证"""
try:
# 验证密钥格式
parts = license_key.split('-')
if len(parts) != 4:
return False, "激活码格式错误应为XXXXX-XXXXX-XXXXX-XXXXX"
for part in parts:
if len(part) != 5:
return False, "激活码格式错误每段应为5个字符"
# 加载嵌入的资源数据
success, config, original_content = self.load_resource_data()
if not success:
return False, f"无法读取配置:{config}"
# 提取数据库配置
db_config = config.get('db_config', {})
host = db_config.get('host', 'localhost')
database = db_config.get('database', 'license_system')
user = db_config.get('user', '')
password = db_config.get('password', '')
# 连接MySQL数据库
try:
conn = mysql.connector.connect(
host=host,
user=user,
password=password,
database=database,
connection_timeout=10,
autocommit=True
)
cursor = conn.cursor(dictionary=True)
except mysql.connector.Error as e:
return False, f"数据库连接失败:{str(e)}"
try:
# 检查许可证是否存在且有效
query = "SELECT * FROM license_keys WHERE key_code = %s"
cursor.execute(query, (license_key,))
result = cursor.fetchone()
if not result:
return False, "激活码不存在"
if result['status'] == 'banned':
return False, "激活码已被封禁"
if result['end_time'] < datetime.now():
cursor.execute(
"UPDATE license_keys SET status = 'expired' WHERE key_code = %s",
(license_key,)
)
return False, "激活码已过期"
if result['status'] == 'active':
if result['machine_code'] != machine_code:
return False, f"此激活码已在其他设备上使用设备ID{result['machine_code'][:8]}..."
else:
return True, "激活验证成功"
if result['status'] == 'unused':
cursor.execute("""
UPDATE license_keys
SET status = 'active', machine_code = %s, start_time = NOW()
WHERE key_code = %s AND status = 'unused'
""", (machine_code, license_key))
if cursor.rowcount == 0:
return False, "激活码已被其他用户使用"
return True, "激活成功"
return False, f"激活码状态异常:{result['status']}"
finally:
cursor.close()
conn.close()
except Exception as e:
return False, f"验证过程出错:{str(e)}"
def extract_original_program(self):
"""从嵌入资源中提取原始exe到临时文件"""
try:
success, config, original_content = self.load_resource_data()
if not success:
messagebox.showerror("错误", config)
return None
# 验证完整性
expected_hash = config['file_hash']
actual_hash = hashlib.sha256(original_content).hexdigest()
if expected_hash != actual_hash:
messagebox.showerror("错误", "原始文件校验失败")
return None
# 创建临时文件
temp_dir = tempfile.gettempdir()
temp_file_path = os.path.join(temp_dir, f"original_program_{os.getpid()}.exe")
with open(temp_file_path, 'wb') as f:
f.write(original_content)
# 设置可执行权限
if os.name != 'nt':
os.chmod(temp_file_path, 0o755)
self.original_exe_path = temp_file_path
return temp_file_path
except Exception as e:
messagebox.showerror("错误", f"提取原始程序失败: {str(e)}")
return None
def _simple_decrypt(self, data):
"""简化版XOR解密"""
key = b'EXEProtector#2024'
decrypted = bytearray(data)
key_len = len(key)
for i in range(len(decrypted)):
decrypted[i] ^= key[i % key_len]
return bytes(decrypted)
def _decompress_data(self, data):
"""解压缩数据"""
import zlib
return zlib.decompress(data)
def launch_program_with_protection(self, program_path):
"""启动程序并应用防护措施"""
try:
# 应用防护措施
self.apply_protection_measures()
# 启动程序
if os.name == 'nt': # Windows
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = win32con.SW_SHOW # ✅ 正确来源
process = subprocess.Popen([program_path], startupinfo=startupinfo)
# process = subprocess.Popen([program_path], startupinfo=startupinfo)
self.process_handle = process.pid
# 启动监控线程
if self.process_monitoring:
threading.Thread(target=self.monitor_process,
args=(process.pid,), daemon=True).start()
# 等待程序启动
time.sleep(1)
# 退出验证器
sys.exit(0)
else: # Linux/Mac
subprocess.Popen([program_path])
sys.exit(0)
except Exception as e:
messagebox.showerror("错误", f"启动程序失败: {str(e)}")
sys.exit(1)
def apply_protection_measures(self):
"""应用防护措施"""
if self.anti_debug_enabled:
self.enable_anti_debug()
if self.anti_vm_enabled:
self.enable_anti_vm()
def enable_anti_debug(self):
"""启用反调试保护"""
try:
if os.name == 'nt': # Windows
# 检查调试器
if ctypes.windll.kernel32.IsDebuggerPresent():
sys.exit(0)
# 设置调试标志
ctypes.windll.kernel32.SetProcessDEPPolicy(0x00000001)
except Exception:
pass
def enable_anti_vm(self):
"""启用反虚拟机保护"""
try:
if os.name == 'nt': # Windows
# 检查常见的虚拟机进程
vm_processes = [
'vmsrvc.exe', 'vmusrvc.exe', 'vmtoolsd.exe',
'vboxservice.exe', 'vboxtray.exe', 'vboxcontrol.exe',
'vmwaretray.exe', 'vmwareuser.exe', 'vmusrvc.exe'
]
for proc in psutil.process_iter(['name']):
try:
if proc.info['name'].lower() in vm_processes:
sys.exit(0)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
except Exception:
pass
def monitor_process(self, pid):
"""监控进程状态"""
try:
while True:
time.sleep(5) # 每5秒检查一次
# 检查进程是否还在运行
try:
process = psutil.Process(pid)
if not process.is_running():
break
except psutil.NoSuchProcess:
break
# 检查是否有调试器附加
if self.anti_debug_enabled:
try:
if ctypes.windll.kernel32.IsDebuggerPresent():
process.terminate()
break
except:
pass
except Exception:
pass
finally:
# 清理临时文件
self.cleanup_temp_files()
def cleanup_temp_files(self):
"""清理临时文件"""
try:
if self.original_exe_path and os.path.exists(self.original_exe_path):
os.remove(self.original_exe_path)
except:
pass
import requests, socket, os, json, tempfile, base64, time
from cryptography.fernet import Fernet
CACHE_FILE = LICENSE_FILE
def online_check(host='taiyiagi.xyz', port=3306, timeout=3):
"""简单检测能否连上数据库服务器"""
try:
socket.create_connection((host, port), timeout)
return True
except:
return False
def verify_with_fallback(validator, key):
"""有网:联网复检;无网:本地缓存兜底"""
db_config = validator.load_resource_data()[1]['db_config']
host = db_config.get('host', 'localhost')
port = int(db_config.get('port', 3306))
if online_check(host, port):
# 有网:强制联网验证
return validator.validate_license(key, validator.get_machine_code())
else:
# 无网尝试使用本地缓存LICENSE_FILE
try:
if os.path.exists(LICENSE_FILE):
with open(LICENSE_FILE, 'rb') as f:
data = json.loads(_get_fernet().decrypt(f.read()).decode())
cached_key = data.get('key')
if cached_key:
return validator.validate_license(cached_key, validator.get_machine_code())
except Exception:
pass
return False, "网络不可用且本地缓存失效"
def main():
validator = EXEWrapperValidator()
# 控制台调试模式(方便调试)
if len(sys.argv) > 1 and sys.argv[1] == '--console':
print("调试模式")
machine_code = validator.get_machine_code()
print(f"机器码: {machine_code}")
license_key = input("请输入激活码: ").strip()
if license_key:
success, msg = validator.validate_license(license_key, machine_code)
print(f"验证结果: {success} - {msg}")
if success:
save_license(license_key)
extracted = validator.extract_original_program()
if extracted:
validator.launch_program_with_protection(extracted)
return
# 1) 启动时优先加载本地缓存卡密(自动验证)
cached_key = load_license()
if cached_key:
success, msg = validator.validate_license(cached_key, validator.get_machine_code())
if success:
extracted = validator.extract_original_program()
if extracted:
validator.launch_program_with_protection(extracted)
return
else:
# 本地缓存不可用,删除并提示
try:
os.remove(LICENSE_FILE)
except:
pass
try:
# 尝试用GUI提示如果可用
tmp = tk.Tk(); tmp.withdraw()
messagebox.showwarning("提示", f"本地授权失效:{msg}")
tmp.destroy()
except:
print("本地授权失效:", msg)
# 2) 无有效授权,弹出 GUI 输入框
validator.show_license_dialog()
if __name__ == "__main__":
main()