222 lines
7.6 KiB
Python
222 lines
7.6 KiB
Python
import os
|
||
import sys
|
||
import hashlib
|
||
from cryptography.fernet import Fernet
|
||
from cryptography.hazmat.primitives import hashes
|
||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||
from cryptography.hazmat.backends import default_backend
|
||
import base64
|
||
import shutil
|
||
|
||
|
||
class EXEEncryptor:
|
||
def __init__(self, key=None):
|
||
"""初始化加密器,如果没有提供密钥则生成一个新的"""
|
||
if key:
|
||
self.key = key
|
||
else:
|
||
self.key = Fernet.generate_key()
|
||
|
||
self.fernet = Fernet(self.key)
|
||
# 使用特殊分隔符标记加密数据的开始和结束
|
||
self.separator = b"<<EXE_ENCRYPTED_DATA>>"
|
||
|
||
def derive_key(self, password, salt=None):
|
||
"""从密码派生密钥"""
|
||
if not salt:
|
||
salt = os.urandom(16)
|
||
|
||
kdf = PBKDF2HMAC(
|
||
algorithm=hashes.SHA256(),
|
||
length=32,
|
||
salt=salt,
|
||
iterations=100000,
|
||
backend=default_backend()
|
||
)
|
||
|
||
key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
|
||
return key, salt
|
||
|
||
def validate_exe_file(self, file_path):
|
||
"""验证EXE文件是否有效"""
|
||
try:
|
||
with open(file_path, 'rb') as f:
|
||
# 检查PE文件头
|
||
f.seek(0)
|
||
dos_header = f.read(2)
|
||
if dos_header != b'MZ':
|
||
return False, "不是有效的EXE文件"
|
||
|
||
# 检查文件大小
|
||
f.seek(0, 2) # 移动到文件末尾
|
||
size = f.tell()
|
||
if size < 1024: # 至少1KB
|
||
return False, "文件太小,可能不是有效的EXE文件"
|
||
|
||
return True, "文件验证通过"
|
||
except Exception as e:
|
||
return False, f"文件验证失败: {str(e)}"
|
||
|
||
def encrypt_file(self, source_path, dest_path, validator_path, db_config):
|
||
"""
|
||
加密EXE文件
|
||
source_path: 源EXE文件路径
|
||
dest_path: 加密后文件保存路径
|
||
validator_path: 验证程序路径
|
||
db_config: 数据库配置信息
|
||
"""
|
||
try:
|
||
# 验证源文件
|
||
valid, msg = self.validate_exe_file(source_path)
|
||
if not valid:
|
||
return False, f"源文件验证失败: {msg}"
|
||
|
||
# 验证验证程序
|
||
valid, msg = self.validate_exe_file(validator_path)
|
||
if not valid:
|
||
return False, f"验证程序文件验证失败: {msg}"
|
||
|
||
# 读取源文件内容
|
||
with open(source_path, 'rb') as f:
|
||
source_data = f.read()
|
||
|
||
print(f"源文件大小: {len(source_data)} 字节")
|
||
|
||
# 加密源文件内容
|
||
encrypted_data = self.fernet.encrypt(source_data)
|
||
print(f"加密后数据大小: {len(encrypted_data)} 字节")
|
||
|
||
# 读取验证程序内容
|
||
with open(validator_path, 'rb') as f:
|
||
validator_data = f.read()
|
||
|
||
print(f"验证程序大小: {len(validator_data)} 字节")
|
||
|
||
# 准备要写入的数据库配置信息
|
||
config_str = f"{db_config['host']}|{db_config['database']}|{db_config['user']}|{db_config['password']}"
|
||
config_data = config_str.encode('utf-8')
|
||
|
||
# 写入加密后的文件
|
||
with open(dest_path, 'wb') as f:
|
||
# 先写入验证程序
|
||
f.write(validator_data)
|
||
print(f"写入验证程序: {len(validator_data)} 字节")
|
||
|
||
# 写入分隔符
|
||
f.write(self.separator)
|
||
|
||
# 写入数据库配置
|
||
f.write(config_data)
|
||
print(f"写入配置数据: {len(config_data)} 字节")
|
||
|
||
f.write(self.separator)
|
||
|
||
# 写入加密密钥
|
||
f.write(self.key)
|
||
print(f"写入密钥: {len(self.key)} 字节")
|
||
|
||
f.write(self.separator)
|
||
|
||
# 写入加密的源文件数据
|
||
f.write(encrypted_data)
|
||
print(f"写入加密数据: {len(encrypted_data)} 字节")
|
||
|
||
# 复制原验证程序的权限到新文件
|
||
try:
|
||
shutil.copystat(validator_path, dest_path)
|
||
except:
|
||
pass
|
||
|
||
# 设置文件可执行权限
|
||
if os.name == 'nt':
|
||
# Windows系统 - 设置文件属性
|
||
import stat
|
||
current_mode = os.stat(dest_path).st_mode
|
||
os.chmod(dest_path, current_mode | stat.S_IEXEC)
|
||
else:
|
||
# Unix-like系统
|
||
os.chmod(dest_path, 0o755)
|
||
|
||
# 验证生成的文件
|
||
final_size = os.path.getsize(dest_path)
|
||
print(f"最终文件大小: {final_size} 字节")
|
||
|
||
return True, "文件加密成功"
|
||
|
||
except Exception as e:
|
||
return False, f"加密失败: {str(e)}"
|
||
|
||
def decrypt_data(self, encrypted_data):
|
||
"""解密数据"""
|
||
try:
|
||
return True, self.fernet.decrypt(encrypted_data)
|
||
except Exception as e:
|
||
return False, f"解密失败: {str(e)}"
|
||
|
||
@staticmethod
|
||
def split_encrypted_file(data):
|
||
"""分割加密文件中的各个部分"""
|
||
separator = b"<<EXE_ENCRYPTED_DATA>>"
|
||
parts = data.split(separator)
|
||
|
||
if len(parts) < 4:
|
||
return None, "无效的加密文件格式"
|
||
|
||
# 各个部分的结构: [验证程序, 数据库配置, 密钥, 加密数据]
|
||
return {
|
||
'validator': parts[0],
|
||
'db_config': parts[1].decode('utf-8'),
|
||
'key': parts[2],
|
||
'encrypted_data': separator.join(parts[3:]) # 处理可能包含分隔符的数据
|
||
}, "成功"
|
||
|
||
def test_encrypted_file(self, encrypted_file_path):
|
||
"""测试加密文件的完整性"""
|
||
try:
|
||
with open(encrypted_file_path, 'rb') as f:
|
||
data = f.read()
|
||
|
||
# 查找分隔符
|
||
separators = []
|
||
start = 0
|
||
while True:
|
||
pos = data.find(self.separator, start)
|
||
if pos == -1:
|
||
break
|
||
separators.append(pos)
|
||
start = pos + len(self.separator)
|
||
|
||
if len(separators) < 3:
|
||
return False, f"分隔符数量不足: {len(separators)}"
|
||
|
||
print(f"找到 {len(separators)} 个分隔符")
|
||
print(f"分隔符位置: {separators}")
|
||
|
||
# 提取并测试各部分
|
||
try:
|
||
db_config_start = separators[0] + len(self.separator)
|
||
db_config_end = separators[1]
|
||
db_config_str = data[db_config_start:db_config_end].decode('utf-8')
|
||
print(f"数据库配置: {db_config_str}")
|
||
|
||
key_start = separators[1] + len(self.separator)
|
||
key_end = separators[2]
|
||
key = data[key_start:key_end]
|
||
print(f"密钥长度: {len(key)}")
|
||
|
||
encrypted_data_start = separators[2] + len(self.separator)
|
||
encrypted_data = data[encrypted_data_start:]
|
||
print(f"加密数据长度: {len(encrypted_data)}")
|
||
|
||
# 测试解密
|
||
fernet = Fernet(key)
|
||
decrypted = fernet.decrypt(encrypted_data)
|
||
print(f"解密成功,原始数据长度: {len(decrypted)}")
|
||
|
||
return True, "文件完整性测试通过"
|
||
|
||
except Exception as e:
|
||
return False, f"数据解析失败: {str(e)}"
|
||
|
||
except Exception as e:
|
||
return False, f"文件测试失败: {str(e)}" |