247 lines
7.6 KiB
Python
247 lines
7.6 KiB
Python
# wrapper_embed.py – 由 PyInstaller 打包,运行时提取并解密原始 EXE
|
||
import os, sys, json, hashlib, tempfile, subprocess, struct
|
||
from datetime import datetime
|
||
import mysql.connector
|
||
import tkinter as tk
|
||
from tkinter import messagebox
|
||
import platform
|
||
|
||
MAGIC = b'ENC_MAGIC'
|
||
HEAD = 512
|
||
KEY = b'EXEWrapper#2024'
|
||
|
||
|
||
def xor(data):
|
||
return bytes(d ^ KEY[i % len(KEY)] for i, d in enumerate(data))
|
||
|
||
|
||
def get_machine_code():
|
||
import uuid
|
||
return hashlib.md5(str(uuid.getnode()).encode()).hexdigest()[:16].upper()
|
||
|
||
|
||
def validate(key, machine):
|
||
try:
|
||
conn = mysql.connector.connect(
|
||
host='taiyiagi.xyz', user='taiyi', password='taiyi1224',
|
||
database='filesend', connection_timeout=10, autocommit=True
|
||
)
|
||
cur = conn.cursor(dictionary=True)
|
||
cur.execute('SELECT * FROM license_keys WHERE key_code=%s', (key,))
|
||
print("验证激活码")
|
||
rec = cur.fetchone()
|
||
if not rec: return False, '激活码不存在'
|
||
if rec['status'] == 'banned': return False, '激活码被封禁'
|
||
if rec['end_time'] < datetime.now(): return False, '激活码已过期'
|
||
if rec['status'] == 'active':
|
||
return (True, '验证成功') if rec['machine_code'] == machine else (False, '激活码已绑定其他设备')
|
||
if rec['status'] == 'unused':
|
||
cur.execute('UPDATE license_keys SET status=%s, machine_code=%s, start_time=NOW() WHERE key_code=%s',
|
||
('active', machine, key))
|
||
return cur.rowcount > 0, '激活成功' if cur.rowcount else '激活失败'
|
||
return False, '激活码状态异常'
|
||
except Exception as e:
|
||
return False, str(e)
|
||
|
||
|
||
def get_bundled_data_path():
|
||
"""获取打包数据文件路径"""
|
||
if getattr(sys, 'frozen', False):
|
||
# 如果是打包后的exe,数据文件在临时目录中
|
||
bundle_dir = sys._MEIPASS
|
||
else:
|
||
# 如果是脚本运行,数据文件在当前目录
|
||
bundle_dir = os.path.dirname(os.path.abspath(__file__))
|
||
|
||
# 查找 .dat 文件
|
||
for file in os.listdir(bundle_dir):
|
||
if file.endswith('.dat'):
|
||
return os.path.join(bundle_dir, file)
|
||
return None
|
||
|
||
|
||
def extract_exe():
|
||
"""从打包的数据文件中提取并解密原始EXE"""
|
||
try:
|
||
# 首先尝试从打包的数据文件中读取
|
||
data_file = get_bundled_data_path()
|
||
if data_file and os.path.exists(data_file):
|
||
with open(data_file, 'rb') as f:
|
||
data = f.read()
|
||
else:
|
||
# 如果没有找到数据文件,尝试从当前exe文件末尾读取
|
||
if getattr(sys, 'frozen', False):
|
||
exe = sys.executable
|
||
with open(exe, 'rb') as f:
|
||
data = f.read()
|
||
else:
|
||
print("未找到加密数据文件")
|
||
return None
|
||
|
||
# 查找魔数位置
|
||
pos = data.find(MAGIC)
|
||
if pos == -1:
|
||
print("未找到加密数据标识")
|
||
return None
|
||
|
||
pos += len(MAGIC)
|
||
|
||
# 读取头部信息
|
||
if pos + HEAD > len(data):
|
||
print("数据文件损坏:头部信息不完整")
|
||
return None
|
||
|
||
header_data = data[pos:pos + HEAD].rstrip(b'\0')
|
||
try:
|
||
hdr = json.loads(header_data.decode())
|
||
except json.JSONDecodeError as e:
|
||
print(f"头部信息解析失败: {e}")
|
||
return None
|
||
|
||
pos += HEAD
|
||
|
||
# 解密数据
|
||
if pos >= len(data):
|
||
print("数据文件损坏:没有加密数据")
|
||
return None
|
||
|
||
encrypted_data = data[pos:]
|
||
if len(encrypted_data) == 0:
|
||
print("加密数据为空")
|
||
return None
|
||
|
||
dec = xor(encrypted_data)
|
||
|
||
# 验证文件完整性
|
||
if hashlib.sha256(dec).hexdigest() != hdr['hash']:
|
||
print("文件完整性校验失败")
|
||
return None
|
||
|
||
if len(dec) != hdr['size']:
|
||
print(f"文件大小不匹配,期望: {hdr['size']}, 实际: {len(dec)}")
|
||
return None
|
||
|
||
# 写入临时文件
|
||
temp_dir = tempfile.gettempdir()
|
||
filename = hdr['filename']
|
||
tmp = os.path.join(temp_dir, f"temp_{os.getpid()}_{filename}")
|
||
|
||
try:
|
||
with open(tmp, 'wb') as f:
|
||
f.write(dec)
|
||
|
||
# 设置可执行权限
|
||
if hasattr(os, 'chmod'):
|
||
os.chmod(tmp, 0o755)
|
||
|
||
print(f"成功提取文件到: {tmp}")
|
||
return tmp
|
||
|
||
except Exception as e:
|
||
print(f"写入临时文件失败: {e}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
print(f"提取EXE文件时发生错误: {e}")
|
||
return None
|
||
|
||
|
||
def run(path):
|
||
"""运行解密后的程序"""
|
||
try:
|
||
if not os.path.exists(path):
|
||
print(f"文件不存在: {path}")
|
||
return False
|
||
|
||
print(f"正在启动程序: {path}")
|
||
|
||
if os.name == 'nt': # Windows
|
||
# 使用 CREATE_NEW_CONSOLE 标志在新窗口中启动
|
||
import subprocess
|
||
subprocess.Popen([path],
|
||
creationflags=subprocess.CREATE_NEW_CONSOLE,
|
||
close_fds=False)
|
||
else: # Linux/macOS
|
||
subprocess.Popen([path], close_fds=True)
|
||
|
||
# 给程序一些启动时间
|
||
import time
|
||
time.sleep(1)
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"启动程序失败: {e}")
|
||
return False
|
||
|
||
|
||
def gui():
|
||
def ok():
|
||
k = e.get().strip()
|
||
if not k:
|
||
return messagebox.showerror('错误', '请输入激活码')
|
||
|
||
try:
|
||
m = get_machine_code()
|
||
success, msg = validate(k, m)
|
||
|
||
if success:
|
||
messagebox.showinfo('成功', '激活成功!正在启动程序...')
|
||
root.destroy()
|
||
|
||
# 提取并运行程序
|
||
exe = extract_exe()
|
||
if exe:
|
||
if run(exe):
|
||
print("程序启动成功")
|
||
# 延迟退出,给目标程序启动时间
|
||
import time
|
||
time.sleep(2)
|
||
sys.exit(0)
|
||
else:
|
||
messagebox.showerror('错误', '程序启动失败')
|
||
else:
|
||
messagebox.showerror('错误', '无法提取程序文件,请检查文件完整性')
|
||
else:
|
||
messagebox.showerror('失败', msg)
|
||
|
||
except Exception as e:
|
||
messagebox.showerror('错误', f'验证过程出错: {str(e)}')
|
||
|
||
root = tk.Tk()
|
||
root.title('软件激活')
|
||
root.geometry('400x200')
|
||
root.resizable(False, False)
|
||
|
||
# 居中显示窗口
|
||
root.eval('tk::PlaceWindow . center')
|
||
|
||
# 界面元素
|
||
tk.Label(root, text='请输入激活码', font=('', 13)).pack(pady=15)
|
||
e = tk.Entry(root, width=25)
|
||
e.pack()
|
||
e.focus()
|
||
|
||
tk.Button(root, text='验证并启动', command=ok).pack(pady=15)
|
||
|
||
# 显示机器码(用于调试)
|
||
machine_code = get_machine_code()
|
||
tk.Label(root, text=f'机器码: {machine_code}', font=('', 8), fg='gray').pack(pady=(10, 0))
|
||
|
||
root.bind('<Return>', lambda x: ok())
|
||
root.protocol('WM_DELETE_WINDOW', sys.exit)
|
||
root.mainloop()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# 添加调试信息
|
||
print("程序启动...")
|
||
print(f"Python版本: {sys.version}")
|
||
print(f"当前工作目录: {os.getcwd()}")
|
||
print(f"是否为打包环境: {getattr(sys, 'frozen', False)}")
|
||
|
||
if getattr(sys, 'frozen', False):
|
||
print(f"打包临时目录: {sys._MEIPASS}")
|
||
print(f"打包临时目录内容: {os.listdir(sys._MEIPASS)}")
|
||
|
||
gui() |