Kamixitong/auth_validator.py

522 lines
17 KiB
Python
Raw Normal View History

2025-11-11 21:39:12 +08:00
"""
Python软件授权验证器
一款轻量级的软件授权验证模块支持在线/离线验证模式
使用方法:
from auth_validator import AuthValidator
validator = AuthValidator(software_id="your_software_id")
if not validator.validate():
exit() # 验证失败退出程序
"""
import os
import json
import time
import hashlib
import requests
from datetime import datetime, timedelta
from typing import Optional, Tuple, Dict, Any
# 内置机器码生成器独立版本不依赖app.utils
class MachineCodeGenerator:
"""独立版本的机器码生成器"""
@staticmethod
def generate() -> str:
"""生成32位机器码"""
import platform
import uuid
import subprocess
hw_info = []
try:
# 获取系统信息
system = platform.system().lower()
# 系统UUID
try:
system_uuid = str(uuid.getnode())
if system_uuid and system_uuid != '0':
hw_info.append(system_uuid)
except:
pass
# 主机名
try:
hostname = platform.node()
if hostname:
hw_info.append(hostname)
except:
pass
# 系统信息
try:
system_info = f"{system}_{platform.release()}_{platform.machine()}"
hw_info.append(system_info)
except:
pass
# Python版本作为备用
try:
python_version = platform.python_version()
hw_info.append(python_version)
except:
pass
except Exception as e:
print(f"获取硬件信息时出错: {e}")
# 如果没有获取到任何信息使用随机UUID
if not hw_info:
hw_info = [str(uuid.uuid4()), str(uuid.uuid4())]
# 生成哈希
combined_info = '|'.join(hw_info)
hash_obj = hashlib.sha256(combined_info.encode('utf-8'))
machine_code = hash_obj.hexdigest()[:32].upper()
return machine_code
# 简化的加密工具
class SimpleCrypto:
"""简单的加密解密工具"""
@staticmethod
def generate_hash(data: str, salt: str = "") -> str:
"""生成哈希值"""
combined = f"{data}{salt}".encode('utf-8')
return hashlib.sha256(combined).hexdigest()
@staticmethod
def generate_signature(data: str, secret_key: str) -> str:
"""生成签名"""
combined = f"{data}{secret_key}".encode('utf-8')
return hashlib.sha256(combined).hexdigest()
class AuthCache:
"""授权信息缓存管理"""
def __init__(self, cache_file: str = ".auth_cache"):
self.cache_file = cache_file
self.cache_data = self._load_cache()
def _load_cache(self) -> Dict[str, Any]:
"""加载缓存"""
try:
if os.path.exists(self.cache_file):
with open(self.cache_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception:
pass
return {}
def _save_cache(self):
"""保存缓存"""
try:
with open(self.cache_file, 'w', encoding='utf-8') as f:
json.dump(self.cache_data, f, ensure_ascii=False, indent=2)
except Exception:
pass
def get_auth_info(self, software_id: str) -> Optional[Dict[str, Any]]:
"""获取授权信息"""
key = f"auth_{software_id}"
return self.cache_data.get(key)
def set_auth_info(self, software_id: str, auth_info: Dict[str, Any]):
"""设置授权信息"""
key = f"auth_{software_id}"
self.cache_data[key] = auth_info
self._save_cache()
def clear_cache(self, software_id = None):
"""清除缓存"""
if software_id:
key = f"auth_{software_id}"
self.cache_data.pop(key, None)
else:
self.cache_data.clear()
self._save_cache()
class AuthValidator:
"""授权验证器主类"""
def __init__(self,
software_id: str,
api_url: str = "http://localhost:5000/api/v1",
secret_key: str = "default-secret-key",
cache_days: int = 7,
timeout: int = 3,
gui_mode: bool = False):
"""
初始化验证器
Args:
software_id: 软件ID在后台创建产品时生成
api_url: 后台API地址
secret_key: 验证密钥
cache_days: 离线缓存有效期
timeout: 网络请求超时时间
gui_mode: 是否使用图形界面输入卡密
"""
self.software_id = software_id
self.api_url = api_url.rstrip('/')
self.secret_key = secret_key
self.cache_days = cache_days
self.timeout = timeout
self.gui_mode = gui_mode
# 初始化组件
self.machine_generator = MachineCodeGenerator()
self.cache = AuthCache()
self.machine_code = self._get_or_generate_machine_code()
# 验证状态
self.failed_attempts = 0
self.last_attempt_time = None
self.locked_until = None
def _get_or_generate_machine_code(self) -> str:
"""获取或生成机器码"""
cache_file = ".machine_code"
# 尝试从缓存加载
try:
if os.path.exists(cache_file):
with open(cache_file, 'r') as f:
cached_code = f.read().strip()
if len(cached_code) == 32:
return cached_code
except Exception:
pass
# 生成新的机器码
machine_code = self.machine_generator.generate()
# 保存到缓存
try:
with open(cache_file, 'w') as f:
f.write(machine_code)
except Exception:
pass
return machine_code
def _is_locked(self) -> bool:
"""检查是否被锁定"""
if not self.locked_until:
return False
if datetime.utcnow() > self.locked_until:
self.locked_until = None
self.failed_attempts = 0
return False
return True
def _lock_account(self, minutes: int = 10):
"""锁定账号"""
self.locked_until = datetime.utcnow() + timedelta(minutes=minutes)
def _get_lock_message(self) -> str:
"""获取锁定消息"""
if self.locked_until:
remaining_minutes = int((self.locked_until - datetime.utcnow()).total_seconds() / 60)
return f"验证失败次数过多,请等待 {remaining_minutes} 分钟后再试"
return ""
def _validate_license_format(self, license_key: str) -> bool:
"""验证卡密格式支持XXXX-XXXX-XXXX-XXXX格式"""
if not license_key:
return False
license_key = license_key.strip().replace(' ', '').replace('\t', '').upper()
# 检查是否为XXXX-XXXX-XXXX-XXXX格式
if '-' in license_key:
parts = license_key.split('-')
# 应该有4部分每部分8个字符
if len(parts) == 4 and all(len(part) == 8 for part in parts):
# 检查所有字符是否为大写字母或数字
combined = ''.join(parts)
if len(combined) == 32:
import re
pattern = r'^[A-Z0-9]+$'
return bool(re.match(pattern, combined))
return False
else:
# 兼容旧格式检查长度16-32位
if len(license_key) < 16 or len(license_key) > 32:
return False
# 检查字符(只允许大写字母、数字和下划线)
import re
pattern = r'^[A-Z0-9_]+$'
return bool(re.match(pattern, license_key))
def _input_license_key(self) -> str:
"""输入卡密"""
if self.gui_mode:
try:
import tkinter as tk
from tkinter import simpledialog, messagebox
root = tk.Tk()
root.withdraw() # 隐藏主窗口
title = "软件授权验证"
prompt = f"软件ID: {self.software_id}\n机器码: {self.machine_code[:8]}...\n\n请输入您的卡密:"
license_key = simpledialog.askstring(title, prompt)
if not license_key:
return ""
root.destroy()
return license_key.strip()
except ImportError:
# 如果tkinter不可用使用命令行输入
pass
# 命令行输入
print(f"\n=== 软件授权验证 ===")
print(f"软件ID: {self.software_id}")
print(f"机器码: {self.machine_code[:8]}...{self.machine_code[-8:]}")
print(f"提示: 试用卡密以 'TRIAL_' 开头")
while True:
license_key = input("请输入卡密: ").strip()
if license_key:
return license_key
print("卡密不能为空,请重新输入。")
def _show_message(self, title: str, message: str, is_error: bool = False):
"""显示消息"""
if self.gui_mode:
try:
import tkinter as tk
from tkinter import messagebox
root = tk.Tk()
root.withdraw()
messagebox.showerror(title, message) if is_error else messagebox.showinfo(title, message)
root.destroy()
return
except ImportError:
pass
# 命令行输出
print(f"\n=== {title} ===")
print(message)
def _online_verify(self, license_key: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]:
"""在线验证"""
try:
url = f"{self.api_url}/auth/verify"
# 准备请求数据
# 使用与服务端一致的时间戳生成方式
timestamp = int(datetime.utcnow().timestamp())
data = {
"software_id": self.software_id,
"license_key": license_key,
"machine_code": self.machine_code,
"timestamp": timestamp
}
# 生成验证签名
signature_data = f"{data['software_id']}{data['license_key']}{data['machine_code']}{data['timestamp']}"
signature = SimpleCrypto.generate_signature(signature_data, self.secret_key)
data["signature"] = signature
# 发送请求
response = requests.post(
url,
json=data,
timeout=self.timeout,
headers={'Content-Type': 'application/json'}
)
if response.status_code == 200:
result = response.json()
if result.get('success'):
auth_info = result.get('data', {})
return True, "验证成功", auth_info
else:
return False, result.get('message', '验证失败'), None
else:
return False, f"网络请求失败: {response.status_code}", None
except requests.exceptions.Timeout:
return False, "网络超时,请检查网络连接", None
except requests.exceptions.ConnectionError:
return False, "无法连接到服务器", None
except Exception as e:
return False, f"验证过程出错: {str(e)}", None
def _offline_verify(self) -> Tuple[bool, str]:
"""离线验证"""
auth_info = self.cache.get_auth_info(self.software_id)
if not auth_info:
return False, "未找到有效的离线授权信息,请联网验证"
# 检查缓存时间
cache_time = datetime.fromisoformat(auth_info.get('cache_time', ''))
if datetime.utcnow() - cache_time > timedelta(days=self.cache_days):
return False, f"离线授权已过期(超过{self.cache_days}天),请联网验证"
# 检查机器码匹配
if auth_info.get('machine_code') != self.machine_code:
return False, "设备信息不匹配,请重新验证"
# 检查有效期
expire_time = auth_info.get('expire_time')
if expire_time:
expire_datetime = datetime.fromisoformat(expire_time)
if datetime.utcnow() > expire_datetime:
return False, "授权已过期,请重新验证"
return True, "离线验证成功"
def _cache_auth_info(self, auth_info: Dict[str, Any]):
"""缓存授权信息"""
auth_info['cache_time'] = datetime.utcnow().isoformat()
self.cache.set_auth_info(self.software_id, auth_info)
def validate(self) -> bool:
"""
执行验证流程
Returns:
bool: 验证是否成功
"""
# 检查锁定状态
if self._is_locked():
self._show_message("验证锁定", self._get_lock_message(), True)
return False
# 首先尝试离线验证
offline_success, offline_message = self._offline_verify()
if offline_success:
return True
# 离线验证失败,尝试在线验证
max_attempts = 3 # 最多尝试3次
for attempt in range(max_attempts):
# 输入卡密
license_key = self._input_license_key()
if not license_key:
self._show_message("验证取消", "未输入卡密,程序退出", True)
return False
# 验证卡密格式
if not self._validate_license_format(license_key):
self._show_message("格式错误", "卡密格式错误,请检查后重新输入", True)
continue
# 在线验证
success, message, auth_info = self._online_verify(license_key)
if success and auth_info:
# 验证成功,缓存授权信息
self._cache_auth_info(auth_info)
# 检查是否需要更新
force_update = auth_info.get('force_update', False)
download_url = auth_info.get('download_url')
new_version = auth_info.get('new_version')
if force_update and download_url:
self._show_message("需要更新", f"发现新版本 {new_version}\n请下载更新后重新启动程序", True)
# 尝试打开下载链接
try:
import webbrowser
webbrowser.open(download_url)
except:
pass
return False
self._show_message("验证成功", f"授权验证成功!\n卡密: {license_key}\n有效期至: {auth_info.get('expire_time', '永久')}")
return True
else:
# 验证失败
self.failed_attempts += 1
self.last_attempt_time = datetime.utcnow()
if self.failed_attempts >= 5: # 失败5次锁定
self._lock_account()
self._show_message("验证失败", self._get_lock_message(), True)
return False
self._show_message("验证失败", f"验证失败: {message}\n剩余尝试次数: {5 - self.failed_attempts}", True)
# 如果不是最后一次尝试,询问是否继续
if attempt < max_attempts - 1:
if self.gui_mode:
try:
import tkinter as tk
from tkinter import messagebox
root = tk.Tk()
root.withdraw()
result = messagebox.askyesno("继续验证", "是否继续输入卡密验证?")
root.destroy()
if not result:
return False
except ImportError:
continue
else:
continue
else:
return False
return False
def get_software_info(self) -> Optional[Dict[str, Any]]:
"""获取软件信息"""
try:
url = f"{self.api_url}/software/info"
params = {"software_id": self.software_id}
response = requests.get(url, params=params, timeout=self.timeout)
if response.status_code == 200:
result = response.json()
if result.get('success'):
return result.get('data')
except Exception:
pass
return None
def clear_cache(self):
"""清除本地缓存"""
self.cache.clear_cache(self.software_id)
# 删除机器码缓存文件
try:
if os.path.exists(".machine_code"):
os.remove(".machine_code")
except Exception:
pass
# 便捷函数
def validate_license(software_id: str, **kwargs) -> bool:
"""
便捷的验证函数
Args:
software_id: 软件ID
**kwargs: 其他参数api_url, secret_key, cache_days, timeout, gui_mode
Returns:
bool: 验证是否成功
"""
validator = AuthValidator(software_id, **kwargs)
return validator.validate()
def get_machine_code() -> str:
"""获取当前机器码"""
return MachineCodeGenerator.generate()