522 lines
17 KiB
Python
522 lines
17 KiB
Python
"""
|
||
Python软件授权验证器
|
||
一款轻量级的软件授权验证模块,支持在线/离线验证模式
|
||
|
||
使用方法:
|
||
from auth_validator import AuthValidator
|
||
|
||
validator = AuthValidator(software_id="your_software_id")
|
||
if not validator.validate():
|
||
exit() # 验证失败退出程序
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
import time
|
||
import hashlib
|
||
import requests
|
||
from datetime import datetime, timedelta
|
||
from typing import Optional, Tuple, Dict, Any
|
||
|
||
# 内置机器码生成器(独立版本,不依赖app.utils)
|
||
class MachineCodeGenerator:
|
||
"""独立版本的机器码生成器"""
|
||
|
||
@staticmethod
|
||
def generate() -> str:
|
||
"""生成32位机器码"""
|
||
import platform
|
||
import uuid
|
||
import subprocess
|
||
|
||
hw_info = []
|
||
|
||
try:
|
||
# 获取系统信息
|
||
system = platform.system().lower()
|
||
|
||
# 系统UUID
|
||
try:
|
||
system_uuid = str(uuid.getnode())
|
||
if system_uuid and system_uuid != '0':
|
||
hw_info.append(system_uuid)
|
||
except:
|
||
pass
|
||
|
||
# 主机名
|
||
try:
|
||
hostname = platform.node()
|
||
if hostname:
|
||
hw_info.append(hostname)
|
||
except:
|
||
pass
|
||
|
||
# 系统信息
|
||
try:
|
||
system_info = f"{system}_{platform.release()}_{platform.machine()}"
|
||
hw_info.append(system_info)
|
||
except:
|
||
pass
|
||
|
||
# Python版本(作为备用)
|
||
try:
|
||
python_version = platform.python_version()
|
||
hw_info.append(python_version)
|
||
except:
|
||
pass
|
||
|
||
except Exception as e:
|
||
print(f"获取硬件信息时出错: {e}")
|
||
|
||
# 如果没有获取到任何信息,使用随机UUID
|
||
if not hw_info:
|
||
hw_info = [str(uuid.uuid4()), str(uuid.uuid4())]
|
||
|
||
# 生成哈希
|
||
combined_info = '|'.join(hw_info)
|
||
hash_obj = hashlib.sha256(combined_info.encode('utf-8'))
|
||
machine_code = hash_obj.hexdigest()[:32].upper()
|
||
|
||
return machine_code
|
||
|
||
# 简化的加密工具
|
||
class SimpleCrypto:
|
||
"""简单的加密解密工具"""
|
||
|
||
@staticmethod
|
||
def generate_hash(data: str, salt: str = "") -> str:
|
||
"""生成哈希值"""
|
||
combined = f"{data}{salt}".encode('utf-8')
|
||
return hashlib.sha256(combined).hexdigest()
|
||
|
||
@staticmethod
|
||
def generate_signature(data: str, secret_key: str) -> str:
|
||
"""生成签名"""
|
||
combined = f"{data}{secret_key}".encode('utf-8')
|
||
return hashlib.sha256(combined).hexdigest()
|
||
|
||
class AuthCache:
|
||
"""授权信息缓存管理"""
|
||
|
||
def __init__(self, cache_file: str = ".auth_cache"):
|
||
self.cache_file = cache_file
|
||
self.cache_data = self._load_cache()
|
||
|
||
def _load_cache(self) -> Dict[str, Any]:
|
||
"""加载缓存"""
|
||
try:
|
||
if os.path.exists(self.cache_file):
|
||
with open(self.cache_file, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
except Exception:
|
||
pass
|
||
return {}
|
||
|
||
def _save_cache(self):
|
||
"""保存缓存"""
|
||
try:
|
||
with open(self.cache_file, 'w', encoding='utf-8') as f:
|
||
json.dump(self.cache_data, f, ensure_ascii=False, indent=2)
|
||
except Exception:
|
||
pass
|
||
|
||
def get_auth_info(self, software_id: str) -> Optional[Dict[str, Any]]:
|
||
"""获取授权信息"""
|
||
key = f"auth_{software_id}"
|
||
return self.cache_data.get(key)
|
||
|
||
def set_auth_info(self, software_id: str, auth_info: Dict[str, Any]):
|
||
"""设置授权信息"""
|
||
key = f"auth_{software_id}"
|
||
self.cache_data[key] = auth_info
|
||
self._save_cache()
|
||
|
||
def clear_cache(self, software_id = None):
|
||
"""清除缓存"""
|
||
if software_id:
|
||
key = f"auth_{software_id}"
|
||
self.cache_data.pop(key, None)
|
||
else:
|
||
self.cache_data.clear()
|
||
self._save_cache()
|
||
|
||
class AuthValidator:
|
||
"""授权验证器主类"""
|
||
|
||
def __init__(self,
|
||
software_id: str,
|
||
api_url: str = "http://localhost:5000/api/v1",
|
||
secret_key: str = "default-secret-key",
|
||
cache_days: int = 7,
|
||
timeout: int = 3,
|
||
gui_mode: bool = False):
|
||
"""
|
||
初始化验证器
|
||
|
||
Args:
|
||
software_id: 软件ID(在后台创建产品时生成)
|
||
api_url: 后台API地址
|
||
secret_key: 验证密钥
|
||
cache_days: 离线缓存有效期(天)
|
||
timeout: 网络请求超时时间(秒)
|
||
gui_mode: 是否使用图形界面输入卡密
|
||
"""
|
||
self.software_id = software_id
|
||
self.api_url = api_url.rstrip('/')
|
||
self.secret_key = secret_key
|
||
self.cache_days = cache_days
|
||
self.timeout = timeout
|
||
self.gui_mode = gui_mode
|
||
|
||
# 初始化组件
|
||
self.machine_generator = MachineCodeGenerator()
|
||
self.cache = AuthCache()
|
||
self.machine_code = self._get_or_generate_machine_code()
|
||
|
||
# 验证状态
|
||
self.failed_attempts = 0
|
||
self.last_attempt_time = None
|
||
self.locked_until = None
|
||
|
||
def _get_or_generate_machine_code(self) -> str:
|
||
"""获取或生成机器码"""
|
||
cache_file = ".machine_code"
|
||
|
||
# 尝试从缓存加载
|
||
try:
|
||
if os.path.exists(cache_file):
|
||
with open(cache_file, 'r') as f:
|
||
cached_code = f.read().strip()
|
||
if len(cached_code) == 32:
|
||
return cached_code
|
||
except Exception:
|
||
pass
|
||
|
||
# 生成新的机器码
|
||
machine_code = self.machine_generator.generate()
|
||
|
||
# 保存到缓存
|
||
try:
|
||
with open(cache_file, 'w') as f:
|
||
f.write(machine_code)
|
||
except Exception:
|
||
pass
|
||
|
||
return machine_code
|
||
|
||
def _is_locked(self) -> bool:
|
||
"""检查是否被锁定"""
|
||
if not self.locked_until:
|
||
return False
|
||
|
||
if datetime.utcnow() > self.locked_until:
|
||
self.locked_until = None
|
||
self.failed_attempts = 0
|
||
return False
|
||
|
||
return True
|
||
|
||
def _lock_account(self, minutes: int = 10):
|
||
"""锁定账号"""
|
||
self.locked_until = datetime.utcnow() + timedelta(minutes=minutes)
|
||
|
||
def _get_lock_message(self) -> str:
|
||
"""获取锁定消息"""
|
||
if self.locked_until:
|
||
remaining_minutes = int((self.locked_until - datetime.utcnow()).total_seconds() / 60)
|
||
return f"验证失败次数过多,请等待 {remaining_minutes} 分钟后再试"
|
||
return ""
|
||
|
||
def _validate_license_format(self, license_key: str) -> bool:
|
||
"""验证卡密格式(支持XXXX-XXXX-XXXX-XXXX格式)"""
|
||
if not license_key:
|
||
return False
|
||
|
||
license_key = license_key.strip().replace(' ', '').replace('\t', '').upper()
|
||
|
||
# 检查是否为XXXX-XXXX-XXXX-XXXX格式
|
||
if '-' in license_key:
|
||
parts = license_key.split('-')
|
||
# 应该有4部分,每部分8个字符
|
||
if len(parts) == 4 and all(len(part) == 8 for part in parts):
|
||
# 检查所有字符是否为大写字母或数字
|
||
combined = ''.join(parts)
|
||
if len(combined) == 32:
|
||
import re
|
||
pattern = r'^[A-Z0-9]+$'
|
||
return bool(re.match(pattern, combined))
|
||
return False
|
||
else:
|
||
# 兼容旧格式:检查长度(16-32位)
|
||
if len(license_key) < 16 or len(license_key) > 32:
|
||
return False
|
||
|
||
# 检查字符(只允许大写字母、数字和下划线)
|
||
import re
|
||
pattern = r'^[A-Z0-9_]+$'
|
||
return bool(re.match(pattern, license_key))
|
||
|
||
def _input_license_key(self) -> str:
|
||
"""输入卡密"""
|
||
if self.gui_mode:
|
||
try:
|
||
import tkinter as tk
|
||
from tkinter import simpledialog, messagebox
|
||
|
||
root = tk.Tk()
|
||
root.withdraw() # 隐藏主窗口
|
||
|
||
title = "软件授权验证"
|
||
prompt = f"软件ID: {self.software_id}\n机器码: {self.machine_code[:8]}...\n\n请输入您的卡密:"
|
||
|
||
license_key = simpledialog.askstring(title, prompt)
|
||
if not license_key:
|
||
return ""
|
||
|
||
root.destroy()
|
||
return license_key.strip()
|
||
|
||
except ImportError:
|
||
# 如果tkinter不可用,使用命令行输入
|
||
pass
|
||
|
||
# 命令行输入
|
||
print(f"\n=== 软件授权验证 ===")
|
||
print(f"软件ID: {self.software_id}")
|
||
print(f"机器码: {self.machine_code[:8]}...{self.machine_code[-8:]}")
|
||
print(f"提示: 试用卡密以 'TRIAL_' 开头")
|
||
|
||
while True:
|
||
license_key = input("请输入卡密: ").strip()
|
||
if license_key:
|
||
return license_key
|
||
print("卡密不能为空,请重新输入。")
|
||
|
||
def _show_message(self, title: str, message: str, is_error: bool = False):
|
||
"""显示消息"""
|
||
if self.gui_mode:
|
||
try:
|
||
import tkinter as tk
|
||
from tkinter import messagebox
|
||
|
||
root = tk.Tk()
|
||
root.withdraw()
|
||
messagebox.showerror(title, message) if is_error else messagebox.showinfo(title, message)
|
||
root.destroy()
|
||
return
|
||
except ImportError:
|
||
pass
|
||
|
||
# 命令行输出
|
||
print(f"\n=== {title} ===")
|
||
print(message)
|
||
|
||
def _online_verify(self, license_key: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]:
|
||
"""在线验证"""
|
||
try:
|
||
url = f"{self.api_url}/auth/verify"
|
||
|
||
# 准备请求数据
|
||
# 使用与服务端一致的时间戳生成方式
|
||
timestamp = int(datetime.utcnow().timestamp())
|
||
data = {
|
||
"software_id": self.software_id,
|
||
"license_key": license_key,
|
||
"machine_code": self.machine_code,
|
||
"timestamp": timestamp
|
||
}
|
||
|
||
# 生成验证签名
|
||
signature_data = f"{data['software_id']}{data['license_key']}{data['machine_code']}{data['timestamp']}"
|
||
signature = SimpleCrypto.generate_signature(signature_data, self.secret_key)
|
||
data["signature"] = signature
|
||
|
||
# 发送请求
|
||
response = requests.post(
|
||
url,
|
||
json=data,
|
||
timeout=self.timeout,
|
||
headers={'Content-Type': 'application/json'}
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
if result.get('success'):
|
||
auth_info = result.get('data', {})
|
||
return True, "验证成功", auth_info
|
||
else:
|
||
return False, result.get('message', '验证失败'), None
|
||
else:
|
||
return False, f"网络请求失败: {response.status_code}", None
|
||
|
||
except requests.exceptions.Timeout:
|
||
return False, "网络超时,请检查网络连接", None
|
||
except requests.exceptions.ConnectionError:
|
||
return False, "无法连接到服务器", None
|
||
except Exception as e:
|
||
return False, f"验证过程出错: {str(e)}", None
|
||
|
||
def _offline_verify(self) -> Tuple[bool, str]:
|
||
"""离线验证"""
|
||
auth_info = self.cache.get_auth_info(self.software_id)
|
||
if not auth_info:
|
||
return False, "未找到有效的离线授权信息,请联网验证"
|
||
|
||
# 检查缓存时间
|
||
cache_time = datetime.fromisoformat(auth_info.get('cache_time', ''))
|
||
if datetime.utcnow() - cache_time > timedelta(days=self.cache_days):
|
||
return False, f"离线授权已过期(超过{self.cache_days}天),请联网验证"
|
||
|
||
# 检查机器码匹配
|
||
if auth_info.get('machine_code') != self.machine_code:
|
||
return False, "设备信息不匹配,请重新验证"
|
||
|
||
# 检查有效期
|
||
expire_time = auth_info.get('expire_time')
|
||
if expire_time:
|
||
expire_datetime = datetime.fromisoformat(expire_time)
|
||
if datetime.utcnow() > expire_datetime:
|
||
return False, "授权已过期,请重新验证"
|
||
|
||
return True, "离线验证成功"
|
||
|
||
def _cache_auth_info(self, auth_info: Dict[str, Any]):
|
||
"""缓存授权信息"""
|
||
auth_info['cache_time'] = datetime.utcnow().isoformat()
|
||
self.cache.set_auth_info(self.software_id, auth_info)
|
||
|
||
def validate(self) -> bool:
|
||
"""
|
||
执行验证流程
|
||
|
||
Returns:
|
||
bool: 验证是否成功
|
||
"""
|
||
# 检查锁定状态
|
||
if self._is_locked():
|
||
self._show_message("验证锁定", self._get_lock_message(), True)
|
||
return False
|
||
|
||
# 首先尝试离线验证
|
||
offline_success, offline_message = self._offline_verify()
|
||
if offline_success:
|
||
return True
|
||
|
||
# 离线验证失败,尝试在线验证
|
||
max_attempts = 3 # 最多尝试3次
|
||
|
||
for attempt in range(max_attempts):
|
||
# 输入卡密
|
||
license_key = self._input_license_key()
|
||
if not license_key:
|
||
self._show_message("验证取消", "未输入卡密,程序退出", True)
|
||
return False
|
||
|
||
# 验证卡密格式
|
||
if not self._validate_license_format(license_key):
|
||
self._show_message("格式错误", "卡密格式错误,请检查后重新输入", True)
|
||
continue
|
||
|
||
# 在线验证
|
||
success, message, auth_info = self._online_verify(license_key)
|
||
|
||
if success and auth_info:
|
||
# 验证成功,缓存授权信息
|
||
self._cache_auth_info(auth_info)
|
||
|
||
# 检查是否需要更新
|
||
force_update = auth_info.get('force_update', False)
|
||
download_url = auth_info.get('download_url')
|
||
new_version = auth_info.get('new_version')
|
||
|
||
if force_update and download_url:
|
||
self._show_message("需要更新", f"发现新版本 {new_version}\n请下载更新后重新启动程序", True)
|
||
# 尝试打开下载链接
|
||
try:
|
||
import webbrowser
|
||
webbrowser.open(download_url)
|
||
except:
|
||
pass
|
||
return False
|
||
|
||
self._show_message("验证成功", f"授权验证成功!\n卡密: {license_key}\n有效期至: {auth_info.get('expire_time', '永久')}")
|
||
return True
|
||
|
||
else:
|
||
# 验证失败
|
||
self.failed_attempts += 1
|
||
self.last_attempt_time = datetime.utcnow()
|
||
|
||
if self.failed_attempts >= 5: # 失败5次锁定
|
||
self._lock_account()
|
||
self._show_message("验证失败", self._get_lock_message(), True)
|
||
return False
|
||
|
||
self._show_message("验证失败", f"验证失败: {message}\n剩余尝试次数: {5 - self.failed_attempts}", True)
|
||
|
||
# 如果不是最后一次尝试,询问是否继续
|
||
if attempt < max_attempts - 1:
|
||
if self.gui_mode:
|
||
try:
|
||
import tkinter as tk
|
||
from tkinter import messagebox
|
||
|
||
root = tk.Tk()
|
||
root.withdraw()
|
||
result = messagebox.askyesno("继续验证", "是否继续输入卡密验证?")
|
||
root.destroy()
|
||
|
||
if not result:
|
||
return False
|
||
except ImportError:
|
||
continue
|
||
else:
|
||
continue
|
||
else:
|
||
return False
|
||
|
||
return False
|
||
|
||
def get_software_info(self) -> Optional[Dict[str, Any]]:
|
||
"""获取软件信息"""
|
||
try:
|
||
url = f"{self.api_url}/software/info"
|
||
params = {"software_id": self.software_id}
|
||
|
||
response = requests.get(url, params=params, timeout=self.timeout)
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
if result.get('success'):
|
||
return result.get('data')
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
def clear_cache(self):
|
||
"""清除本地缓存"""
|
||
self.cache.clear_cache(self.software_id)
|
||
# 删除机器码缓存文件
|
||
try:
|
||
if os.path.exists(".machine_code"):
|
||
os.remove(".machine_code")
|
||
except Exception:
|
||
pass
|
||
|
||
# 便捷函数
|
||
def validate_license(software_id: str, **kwargs) -> bool:
|
||
"""
|
||
便捷的验证函数
|
||
|
||
Args:
|
||
software_id: 软件ID
|
||
**kwargs: 其他参数(api_url, secret_key, cache_days, timeout, gui_mode)
|
||
|
||
Returns:
|
||
bool: 验证是否成功
|
||
"""
|
||
validator = AuthValidator(software_id, **kwargs)
|
||
return validator.validate()
|
||
|
||
def get_machine_code() -> str:
|
||
"""获取当前机器码"""
|
||
return MachineCodeGenerator.generate() |