# --- START OF FILE auth_validator.py --- """ Python软件授权验证器 (现代化UI版) 功能: 1. 在线验证(每次启动都进行服务器验证) 2. 自动保存/读取历史卡密 3. 现代化深色主题 UI 4. 机器码一键复制 5. 防止后台禁用卡密后仍能使用 使用方法 (完全兼容旧版): from auth_validator import AuthValidator validator = AuthValidator( software_id="your_software_id", api_url="http://your-server.com/api/v1", secret_key="your_secret_key" ) if not validator.validate(): exit() """ import os import json import time import hashlib import threading import requests import platform import uuid from datetime import datetime, timedelta from typing import Optional, Tuple, Dict, Any # 尝试导入现代化UI库,如果未安装则提示 try: import customtkinter as ctk import tkinter as tk from tkinter import messagebox except ImportError: print("错误: 请先安装UI库 -> pip install customtkinter") exit(1) # ========================================== # 1. 基础设施层 (配置与机器码) # ========================================== class ConfigManager: """管理本地配置(如上次使用的卡密)""" CONFIG_FILE = "auth_config.json" @classmethod def load_config(cls) -> dict: if os.path.exists(cls.CONFIG_FILE): try: with open(cls.CONFIG_FILE, 'r', encoding='utf-8') as f: return json.load(f) except: pass return {} @classmethod def save_last_key(cls, license_key: str): """保存最后一次成功的卡密""" config = cls.load_config() config['last_license_key'] = license_key try: with open(cls.CONFIG_FILE, 'w', encoding='utf-8') as f: json.dump(config, f, indent=2) except: pass @classmethod def get_last_key(cls) -> str: return cls.load_config().get('last_license_key', '') class MachineCodeGenerator: """生成稳定的机器码""" @staticmethod def get() -> str: cache_file = ".machine_id" # 优先读取缓存保持不变 if os.path.exists(cache_file): with open(cache_file, 'r') as f: return f.read().strip() hw_info = [] try: # 1. 平台信息 hw_info.append(platform.node()) hw_info.append(platform.machine()) # 2. MAC地址 hw_info.append(str(uuid.getnode())) # 3. Windows UUID (如果是Windows) if platform.system() == "Windows": try: import subprocess cmd = "wmic csproduct get uuid" uuid_str = subprocess.check_output(cmd).decode().split('\n')[1].strip() hw_info.append(uuid_str) except: pass except: hw_info.append(str(uuid.uuid4())) # 生成Hash combined = "|".join(hw_info) code = hashlib.sha256(combined.encode()).hexdigest()[:32].upper() # 写入缓存 try: with open(cache_file, 'w') as f: f.write(code) except: pass return code # ========================================== # 2. 核心逻辑层 (验证与通信) # ========================================== class AuthCore: """处理验证逻辑,不包含UI""" def __init__(self, software_id, api_url, secret_key, timeout=5): self.software_id = software_id self.api_url = api_url.rstrip('/') self.secret_key = secret_key self.timeout = timeout self.machine_code = MachineCodeGenerator.get() self.token_file = f".auth_{software_id}.token" def test_connection(self) -> Tuple[bool, str]: """测试服务器连接""" try: # 尝试访问一个简单的端点(如果存在)或直接测试连接 test_url = f"{self.api_url}/auth/verify" # 发送一个简单的HEAD请求测试连接(如果服务器支持) # 否则发送一个最小化的POST请求 test_data = { "software_id": self.software_id, "license_key": "TEST", "machine_code": self.machine_code, "timestamp": int(time.time()), "signature": "test" } resp = requests.post(test_url, json=test_data, timeout=3) # 即使返回错误,只要不是连接错误,说明服务器可达 return True, "服务器连接正常" except requests.exceptions.Timeout: return False, f"连接超时,服务器可能无响应: {self.api_url}" except requests.exceptions.ConnectionError as e: error_detail = str(e) if "Name or service not known" in error_detail: return False, f"无法解析服务器地址: {self.api_url}" elif "Connection refused" in error_detail: return False, f"服务器拒绝连接,请确认服务器是否运行: {self.api_url}" else: return False, f"无法连接到服务器: {self.api_url}" except Exception as e: return False, f"连接测试失败: {str(e)}" def clear_token(self): """清除本地Token缓存""" try: if os.path.exists(self.token_file): os.remove(self.token_file) except: pass def verify_online(self, license_key: str) -> Tuple[bool, str, dict]: """在线验证""" try: # 生成时间戳 timestamp = int(time.time()) # 生成签名数据 signature_data = f"{self.software_id}{license_key}{self.machine_code}{timestamp}" # 生成签名 combined = f"{signature_data}{self.secret_key}".encode('utf-8') signature = hashlib.sha256(combined).hexdigest() # 构建请求数据 request_data = { "software_id": self.software_id, "license_key": license_key, "machine_code": self.machine_code, "timestamp": timestamp, "signature": signature, "software_version": "1.0.0" # 可以后续从配置中读取 } # 发送POST请求 verify_url = f"{self.api_url}/auth/verify" try: resp = requests.post(verify_url, json=request_data, timeout=self.timeout) except requests.exceptions.Timeout: return False, f"连接超时({self.timeout}秒),请检查网络连接或服务器地址: {self.api_url}", {} except requests.exceptions.ConnectionError as e: # 提供更详细的连接错误信息 error_detail = str(e) if "Name or service not known" in error_detail or "nodename nor servname provided" in error_detail: return False, f"无法解析服务器地址,请检查API地址是否正确: {self.api_url}", {} elif "Connection refused" in error_detail: return False, f"服务器拒绝连接,请确认服务器是否运行在: {self.api_url}", {} elif "No route to host" in error_detail: return False, f"无法到达服务器,请检查网络连接: {self.api_url}", {} else: return False, f"无法连接到服务器 ({self.api_url}),请检查网络连接和服务器状态", {} # 检查HTTP状态码 if resp.status_code != 200: # 处理特定的HTTP状态码 if resp.status_code == 503: return False, "服务器暂时不可用,请稍后重试", {} elif resp.status_code == 500: return False, "服务器内部错误,请联系管理员", {} elif resp.status_code == 404: return False, "API接口不存在,请检查API地址", {} elif resp.status_code == 401: return False, "签名验证失败,请检查密钥配置", {} else: try: error_data = resp.json() error_msg = error_data.get('message', f'服务器返回错误: {resp.status_code}') except: error_msg = f'服务器返回错误: {resp.status_code}' return False, error_msg, {} # 解析响应 result = resp.json() # 检查验证结果 if not result.get('success', False): error_msg = result.get('message', '验证失败') return False, error_msg, {} # 验证成功,提取数据 data = result.get('data', {}) # 构建返回数据(兼容原有格式) response_data = { "expire_time": data.get('expire_time'), "machine_code": self.machine_code, "last_check": datetime.utcnow().isoformat(), "license_key": data.get('license_key', license_key), "type": data.get('type'), "type_name": data.get('type_name', ''), "remaining_days": data.get('remaining_days'), "product_name": data.get('product_name', '') } return True, result.get('message', '验证成功'), response_data except requests.exceptions.Timeout: return False, "连接超时,请检查网络连接", {} except requests.exceptions.ConnectionError as e: return False, f"无法连接到服务器: {str(e)},请检查网络连接和服务器地址", {} except requests.exceptions.RequestException as e: return False, f"网络请求失败: {str(e)}", {} except Exception as e: return False, f"验证过程出错: {str(e)}", {} def save_token(self, data: dict): """验证成功后保存Token""" try: with open(self.token_file, 'w') as f: json.dump(data, f) except: pass # ========================================== # 3. 现代化 UI 层 (View) # ========================================== class AuthWindow(ctk.CTk): """现代化验证窗口""" def __init__(self, auth_core: AuthCore): super().__init__() self.auth_core = auth_core self.is_verified = False # 验证结果状态 self.auto_verify = False # 是否自动验证标志 self.is_destroyed = False # 窗口是否已销毁标志 self.pending_callbacks = [] # 待执行的after回调ID列表 self.verifying = False # 是否正在验证中 # 窗口基础设置 self.title("软件授权验证") self.geometry("300x350") # 增加高度以容纳服务器地址显示 self.resizable(False, False) ctk.set_appearance_mode("Dark") ctk.set_default_color_theme("blue") # 绑定窗口关闭事件 self.protocol("WM_DELETE_WINDOW", self._on_closing) # 居中显示 self._center_window() self._setup_ui() # 自动填入上次卡密,如果有则自动验证 self._load_history() def _on_closing(self): """窗口关闭时的处理(用户点击X关闭)""" # 如果用户手动关闭窗口,且验证未完成或正在验证中,则视为验证失败 if self.verifying or not self.is_verified: self.is_verified = False self.is_destroyed = True self.verifying = False # 取消所有pending的after回调 for callback_id in self.pending_callbacks: try: self.after_cancel(callback_id) except: pass self.pending_callbacks.clear() self.destroy() def _center_window(self): self.update_idletasks() width = self.winfo_width() height = self.winfo_height() x = (self.winfo_screenwidth() // 2) - (width // 2) y = (self.winfo_screenheight() // 2) - (height // 2) self.geometry(f'{width}x{height}+{x}+{y}') def _setup_ui(self): # 1. 头部图标与标题 self.header = ctk.CTkFrame(self, fg_color="transparent") self.header.pack(pady=(40, 20)) ctk.CTkLabel(self.header, text="🔐", font=("Segoe UI Emoji", 56)).pack() ctk.CTkLabel(self.header, text="用户授权系统", font=("Microsoft YaHei UI", 22, "bold")).pack(pady=5) # 2. 机器码显示区 self.mc_frame = ctk.CTkFrame(self, fg_color="#2B2B2B", corner_radius=8) self.mc_frame.pack(padx=30, pady=10, fill="x") ctk.CTkLabel(self.mc_frame, text="机器码 (点击复制):", font=("Microsoft YaHei UI", 12), text_color="gray").pack( anchor="w", padx=15, pady=(10, 0)) self.mc_btn = ctk.CTkButton( self.mc_frame, text=self.auth_core.machine_code, font=("Consolas", 11), fg_color="transparent", hover_color="#333333", anchor="w", command=self._copy_machine_code ) self.mc_btn.pack(fill="x", padx=5, pady=(0, 10)) # 3. 卡密输入区 self.input_frame = ctk.CTkFrame(self, fg_color="transparent") self.input_frame.pack(padx=30, pady=10, fill="x") ctk.CTkLabel(self.input_frame, text="请输入激活码 / 卡密:", font=("Microsoft YaHei UI", 14)).pack(anchor="w", pady=(0, 5)) self.entry_key = ctk.CTkEntry( self.input_frame, height=45, placeholder_text="XXXXX-XXXXX-XXXXX-XXXXX", font=("Consolas", 14), border_width=2 ) self.entry_key.pack(fill="x") # 4. 服务器地址显示(小字,灰色) self.lbl_server = ctk.CTkLabel( self, text=f"服务器: {self.auth_core.api_url}", text_color="#666", font=("Microsoft YaHei UI", 9) ) self.lbl_server.pack(pady=(10, 0)) # 5. 状态提示信息(支持多行) self.lbl_status = ctk.CTkLabel( self, text="等待验证...", text_color="gray", font=("Microsoft YaHei UI", 12), wraplength=360, # 允许自动换行 justify="left" ) self.lbl_status.pack(pady=(10, 5)) # 6. 验证按钮 self.btn_verify = ctk.CTkButton( self, text="立即验证授权", height=50, font=("Microsoft YaHei UI", 16, "bold"), command=self._handle_verify ) self.btn_verify.pack(padx=30, pady=20, fill="x") # 底部版权 ctk.CTkLabel(self, text="Powered by AuthValidator", font=("Arial", 10), text_color="#444").pack(side="bottom", pady=10) def _load_history(self): """读取历史卡密,如果有则自动验证""" last_key = ConfigManager.get_last_key() if last_key: self.entry_key.insert(0, last_key) self.lbl_status.configure(text="已自动填入上次卡密,正在验证中...", text_color="#2196F3") # 延迟100ms后自动触发验证,确保UI已完全加载 callback_id = self.after(100, self._safe_auto_verify) self.pending_callbacks.append(callback_id) else: self.lbl_status.configure(text="请输入卡密并点击验证", text_color="gray") def _safe_auto_verify(self): """安全地自动验证保存的卡密""" if self.is_destroyed: return key = self.entry_key.get().strip() if key: self.auto_verify = True self._handle_verify() def _copy_machine_code(self): if self.is_destroyed: return self.clipboard_clear() self.clipboard_append(self.auth_core.machine_code) self.lbl_status.configure(text="✅ 机器码已复制到剪贴板", text_color="#4CAF50") callback_id = self.after(2000, self._safe_reset_status) self.pending_callbacks.append(callback_id) def _safe_reset_status(self): """安全地重置状态提示""" if not self.is_destroyed: self.lbl_status.configure(text="等待验证...", text_color="gray") def _handle_verify(self): key = self.entry_key.get().strip() if not key: self.lbl_status.configure(text="❌ 卡密不能为空", text_color="#F44336") return # 如果正在验证中,忽略重复请求 if self.verifying: return # 标记为正在验证 self.verifying = True self.is_verified = False # 重置验证状态 # 锁定UI self.btn_verify.configure(state="disabled", text="正在连接服务器...") self.entry_key.configure(state="disabled") self.lbl_status.configure(text="⏳ 正在验证中,请稍候...", text_color="#2196F3") # 开启线程进行验证 threading.Thread(target=self._verify_thread, args=(key,), daemon=True).start() def _verify_thread(self, key): """后台验证逻辑""" success, msg, data = self.auth_core.verify_online(key) # 回到主线程更新UI,使用安全的方式 if not self.is_destroyed: callback_id = self.after(0, lambda: self._on_verify_result(success, msg, key, data)) self.pending_callbacks.append(callback_id) def _on_verify_result(self, success, msg, key, data): # 如果窗口已销毁,直接返回 if self.is_destroyed: return # 标记验证完成 self.verifying = False self.btn_verify.configure(state="normal", text="立即验证授权") self.entry_key.configure(state="normal") if success: # 验证成功 - 必须先设置 is_verified,再关闭窗口 self.is_verified = True self.lbl_status.configure(text=f"✅ {msg}", text_color="#4CAF50") # 保存卡密和Token ConfigManager.save_last_key(key) self.auth_core.save_token(data) # 如果是自动验证,延迟关闭窗口;如果是手动验证,给用户1秒查看结果后关闭 delay = 800 if self.auto_verify else 1000 callback_id = self.after(delay, self._safe_close_window) self.pending_callbacks.append(callback_id) else: # 验证失败,确保 is_verified 为 False self.is_verified = False # 清除本地缓存 self.auth_core.clear_token() # 格式化错误消息,如果是连接错误,显示API地址 error_msg = msg if "无法连接" in msg or "连接超时" in msg or "服务器" in msg: # 在错误消息中已经包含了API地址,直接显示 pass elif len(msg) > 80: # 如果错误消息太长,截断并添加提示 error_msg = msg[:80] + "..." # 如果是自动验证失败,允许用户修改卡密后重新验证 if self.auto_verify: self.lbl_status.configure(text=f"❌ {error_msg}\n请检查网络连接或重新输入卡密", text_color="#F44336") self.auto_verify = False # 重置标志,允许手动验证 else: self.lbl_status.configure(text=f"❌ {error_msg}", text_color="#F44336") def _safe_close_window(self): """安全地关闭窗口""" if not self.is_destroyed: self.is_destroyed = True # 取消所有pending的after回调 for callback_id in self.pending_callbacks: try: self.after_cancel(callback_id) except: pass self.pending_callbacks.clear() self.destroy() # ========================================== # 4. 统一接口层 (Facade) # ========================================== class AuthValidator: """ 授权验证器主类 保持与旧版一致的调用接口 """ def __init__(self, software_id: str, api_url: str = "http://localhost:5000/api/v1", secret_key: str = "default_secret", **kwargs): """ 初始化验证器 Args: software_id: 软件ID api_url: API地址 secret_key: 密钥 """ self.core = AuthCore(software_id, api_url, secret_key) def validate(self) -> bool: """ 执行验证流程 (阻塞式) 每次打开都必须进行在线验证,防止后台禁用卡密后用户仍能使用 1. 读取保存的卡密(如果有) 2. 弹出现代化UI窗口并自动验证 3. 验证失败则要求用户重新输入 Returns: bool: 是否验证成功 """ # 启动 UI 窗口(会自动读取保存的卡密并验证) app = AuthWindow(self.core) # 运行主循环 (这会阻塞代码执行,直到窗口关闭) app.mainloop() # 窗口关闭后,检查是否验证成功 return app.is_verified # --- END OF FILE ---