""" Python软件授权验证器 一款轻量级的软件授权验证模块,支持在线/离线验证模式 使用方法: from auth_validator import AuthValidator validator = AuthValidator(software_id="your_software_id") if not validator.validate(): exit() # 验证失败退出程序 """ import os import json import time import hashlib import requests from datetime import datetime, timedelta from typing import Optional, Tuple, Dict, Any # 内置机器码生成器(独立版本,不依赖app.utils) class MachineCodeGenerator: """独立版本的机器码生成器""" @staticmethod def generate() -> str: """生成32位机器码""" import platform import uuid import subprocess hw_info = [] try: # 获取系统信息 system = platform.system().lower() # 系统UUID try: system_uuid = str(uuid.getnode()) if system_uuid and system_uuid != '0': hw_info.append(system_uuid) except: pass # 主机名 try: hostname = platform.node() if hostname: hw_info.append(hostname) except: pass # 系统信息 try: system_info = f"{system}_{platform.release()}_{platform.machine()}" hw_info.append(system_info) except: pass # Python版本(作为备用) try: python_version = platform.python_version() hw_info.append(python_version) except: pass except Exception as e: print(f"获取硬件信息时出错: {e}") # 如果没有获取到任何信息,使用随机UUID if not hw_info: hw_info = [str(uuid.uuid4()), str(uuid.uuid4())] # 生成哈希 combined_info = '|'.join(hw_info) hash_obj = hashlib.sha256(combined_info.encode('utf-8')) machine_code = hash_obj.hexdigest()[:32].upper() return machine_code # 简化的加密工具 class SimpleCrypto: """简单的加密解密工具""" @staticmethod def generate_hash(data: str, salt: str = "") -> str: """生成哈希值""" combined = f"{data}{salt}".encode('utf-8') return hashlib.sha256(combined).hexdigest() @staticmethod def generate_signature(data: str, secret_key: str) -> str: """生成签名""" combined = f"{data}{secret_key}".encode('utf-8') return hashlib.sha256(combined).hexdigest() class AuthCache: """授权信息缓存管理""" def __init__(self, cache_file: str = ".auth_cache"): self.cache_file = cache_file self.cache_data = self._load_cache() def _load_cache(self) -> Dict[str, Any]: """加载缓存""" try: if os.path.exists(self.cache_file): with open(self.cache_file, 'r', encoding='utf-8') as f: return json.load(f) except Exception: pass return {} def _save_cache(self): """保存缓存""" try: with open(self.cache_file, 'w', encoding='utf-8') as f: json.dump(self.cache_data, f, ensure_ascii=False, indent=2) except Exception: pass def get_auth_info(self, software_id: str) -> Optional[Dict[str, Any]]: """获取授权信息""" key = f"auth_{software_id}" return self.cache_data.get(key) def set_auth_info(self, software_id: str, auth_info: Dict[str, Any]): """设置授权信息""" key = f"auth_{software_id}" self.cache_data[key] = auth_info self._save_cache() def clear_cache(self, software_id = None): """清除缓存""" if software_id: key = f"auth_{software_id}" self.cache_data.pop(key, None) else: self.cache_data.clear() self._save_cache() class AuthValidator: """授权验证器主类""" def __init__(self, software_id: str, api_url: str = "http://localhost:5000/api/v1", secret_key: str = "taiyi1224", cache_days: int = 7, timeout: int = 3, gui_mode: bool = False): """ 初始化验证器 Args: software_id: 软件ID(在后台创建产品时生成) api_url: 后台API地址 secret_key: 验证密钥 cache_days: 离线缓存有效期(天) timeout: 网络请求超时时间(秒) gui_mode: 是否使用图形界面输入卡密 """ self.software_id = software_id self.api_url = api_url.rstrip('/') self.secret_key = secret_key self.cache_days = cache_days self.timeout = timeout self.gui_mode = gui_mode # 初始化组件 self.machine_generator = MachineCodeGenerator() self.cache = AuthCache() self.machine_code = self._get_or_generate_machine_code() # 验证状态 self.failed_attempts = 0 self.last_attempt_time = None self.locked_until = None def _get_or_generate_machine_code(self) -> str: """获取或生成机器码""" cache_file = ".machine_code" # 尝试从缓存加载 try: if os.path.exists(cache_file): with open(cache_file, 'r') as f: cached_code = f.read().strip() if len(cached_code) == 32: return cached_code except Exception: pass # 生成新的机器码 machine_code = self.machine_generator.generate() # 保存到缓存 try: with open(cache_file, 'w') as f: f.write(machine_code) except Exception: pass return machine_code def _is_locked(self) -> bool: """检查是否被锁定""" if not self.locked_until: return False if datetime.utcnow() > self.locked_until: self.locked_until = None self.failed_attempts = 0 return False return True def _lock_account(self, minutes: int = 10): """锁定账号""" self.locked_until = datetime.utcnow() + timedelta(minutes=minutes) def _get_lock_message(self) -> str: """获取锁定消息""" if self.locked_until: remaining_minutes = int((self.locked_until - datetime.utcnow()).total_seconds() / 60) return f"验证失败次数过多,请等待 {remaining_minutes} 分钟后再试" return "" def _validate_license_format(self, license_key: str) -> bool: """验证卡密格式""" if not license_key: return False license_key = license_key.strip().replace(' ', '').replace('\t', '').upper() # 检查长度(16-32位) if len(license_key) < 16 or len(license_key) > 32: return False # 检查字符(只允许大写字母、数字和下划线) import re pattern = r'^[A-Z0-9_]+$' return bool(re.match(pattern, license_key)) def _input_license_key(self) -> str: """输入卡密""" if self.gui_mode: try: import tkinter as tk from tkinter import simpledialog, messagebox root = tk.Tk() root.withdraw() # 隐藏主窗口 title = "软件授权验证" prompt = f"软件ID: {self.software_id}\n机器码: {self.machine_code[:8]}...\n\n请输入您的卡密:" license_key = simpledialog.askstring(title, prompt) if not license_key: return "" root.destroy() return license_key.strip() except ImportError: # 如果tkinter不可用,使用命令行输入 pass # 命令行输入 print(f"\n=== 软件授权验证 ===") print(f"软件ID: {self.software_id}") print(f"机器码: {self.machine_code[:8]}...{self.machine_code[-8:]}") print(f"提示: 试用卡密以 'TRIAL_' 开头") while True: license_key = input("请输入卡密: ").strip() if license_key: return license_key print("卡密不能为空,请重新输入。") def _show_message(self, title: str, message: str, is_error: bool = False): """显示消息""" if self.gui_mode: try: import tkinter as tk from tkinter import messagebox root = tk.Tk() root.withdraw() messagebox.showerror(title, message) if is_error else messagebox.showinfo(title, message) root.destroy() return except ImportError: pass # 命令行输出 print(f"\n=== {title} ===") print(message) def _online_verify(self, license_key: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]: """在线验证""" try: url = f"{self.api_url}/auth/verify" # 准备请求数据 # 使用与服务端一致的时间戳生成方式 timestamp = int(datetime.utcnow().timestamp()) data = { "software_id": self.software_id, "license_key": license_key, "machine_code": self.machine_code, "timestamp": timestamp } # 生成验证签名 signature_data = f"{data['software_id']}{data['license_key']}{data['machine_code']}{data['timestamp']}" signature = SimpleCrypto.generate_signature(signature_data, self.secret_key) data["signature"] = signature # 发送请求 response = requests.post( url, json=data, timeout=self.timeout, headers={'Content-Type': 'application/json'} ) if response.status_code == 200: result = response.json() if result.get('success'): auth_info = result.get('data', {}) return True, "验证成功", auth_info else: return False, result.get('message', '验证失败'), None else: return False, f"网络请求失败: {response.status_code}", None except requests.exceptions.Timeout: return False, "网络超时,请检查网络连接", None except requests.exceptions.ConnectionError: return False, "无法连接到服务器", None except Exception as e: return False, f"验证过程出错: {str(e)}", None def _offline_verify(self) -> Tuple[bool, str]: """离线验证""" auth_info = self.cache.get_auth_info(self.software_id) if not auth_info: return False, "未找到有效的离线授权信息,请联网验证" # 检查缓存时间 cache_time = datetime.fromisoformat(auth_info.get('cache_time', '')) if datetime.utcnow() - cache_time > timedelta(days=self.cache_days): return False, f"离线授权已过期(超过{self.cache_days}天),请联网验证" # 检查机器码匹配 if auth_info.get('machine_code') != self.machine_code: return False, "设备信息不匹配,请重新验证" # 检查有效期 expire_time = auth_info.get('expire_time') if expire_time: expire_datetime = datetime.fromisoformat(expire_time) if datetime.utcnow() > expire_datetime: return False, "授权已过期,请重新验证" return True, "离线验证成功" def _cache_auth_info(self, auth_info: Dict[str, Any]): """缓存授权信息""" auth_info['cache_time'] = datetime.utcnow().isoformat() self.cache.set_auth_info(self.software_id, auth_info) def validate(self) -> bool: """ 执行验证流程 Returns: bool: 验证是否成功 """ # 检查锁定状态 if self._is_locked(): self._show_message("验证锁定", self._get_lock_message(), True) return False # 首先尝试离线验证 offline_success, offline_message = self._offline_verify() if offline_success: return True # 离线验证失败,尝试在线验证 max_attempts = 3 # 最多尝试3次 for attempt in range(max_attempts): # 输入卡密 license_key = self._input_license_key() if not license_key: self._show_message("验证取消", "未输入卡密,程序退出", True) return False # 验证卡密格式 if not self._validate_license_format(license_key): self._show_message("格式错误", "卡密格式错误,请检查后重新输入", True) continue # 在线验证 success, message, auth_info = self._online_verify(license_key) if success and auth_info: # 验证成功,缓存授权信息 self._cache_auth_info(auth_info) # 检查是否需要更新 force_update = auth_info.get('force_update', False) download_url = auth_info.get('download_url') new_version = auth_info.get('new_version') if force_update and download_url: self._show_message("需要更新", f"发现新版本 {new_version}\n请下载更新后重新启动程序", True) # 尝试打开下载链接 try: import webbrowser webbrowser.open(download_url) except: pass return False self._show_message("验证成功", f"授权验证成功!\n卡密: {license_key}\n有效期至: {auth_info.get('expire_time', '永久')}") return True else: # 验证失败 self.failed_attempts += 1 self.last_attempt_time = datetime.utcnow() if self.failed_attempts >= 5: # 失败5次锁定 self._lock_account() self._show_message("验证失败", self._get_lock_message(), True) return False self._show_message("验证失败", f"验证失败: {message}\n剩余尝试次数: {5 - self.failed_attempts}", True) # 如果不是最后一次尝试,询问是否继续 if attempt < max_attempts - 1: if self.gui_mode: try: import tkinter as tk from tkinter import messagebox root = tk.Tk() root.withdraw() result = messagebox.askyesno("继续验证", "是否继续输入卡密验证?") root.destroy() if not result: return False except ImportError: continue else: continue else: return False return False def get_software_info(self) -> Optional[Dict[str, Any]]: """获取软件信息""" try: url = f"{self.api_url}/software/info" params = {"software_id": self.software_id} response = requests.get(url, params=params, timeout=self.timeout) if response.status_code == 200: result = response.json() if result.get('success'): return result.get('data') except Exception: pass return None def clear_cache(self): """清除本地缓存""" self.cache.clear_cache(self.software_id) # 删除机器码缓存文件 try: if os.path.exists(".machine_code"): os.remove(".machine_code") except Exception: pass # 便捷函数 def validate_license(software_id: str, **kwargs) -> bool: """ 便捷的验证函数 Args: software_id: 软件ID **kwargs: 其他参数(api_url, secret_key, cache_days, timeout, gui_mode) Returns: bool: 验证是否成功 """ validator = AuthValidator(software_id, **kwargs) return validator.validate() def get_machine_code() -> str: """获取当前机器码""" return MachineCodeGenerator.generate()