ArticleReplace/auth_validator.py
2025-11-23 22:52:25 +08:00

610 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# --- START OF FILE auth_validator.py ---
"""
Python软件授权验证器 (现代化UI版)
功能:
1. 在线验证(每次启动都进行服务器验证)
2. 自动保存/读取历史卡密
3. 现代化深色主题 UI
4. 机器码一键复制
5. 防止后台禁用卡密后仍能使用
6. 验证通过后自动恢复tkinter原始缩放比例
使用方法 (完全兼容旧版):
from auth_validator import AuthValidator
validator = AuthValidator(
software_id="your_software_id",
api_url="http://your-server.com/api/v1",
secret_key="your_secret_key"
)
if not validator.validate():
sys.exit()
"""
import sys
import os
import json
import time
import hashlib
import threading
import requests
import platform
import uuid
from datetime import datetime, timedelta
from typing import Optional, Tuple, Dict, Any
# 尝试导入现代化UI库如果未安装则提示
try:
import customtkinter as ctk
import tkinter as tk
from tkinter import messagebox
except ImportError:
print("错误: 请先安装UI库 -> pip install customtkinter")
sys.exit(1)
# ==========================================
# 1. 基础设施层 (配置与机器码)
# ==========================================
class ConfigManager:
"""管理本地配置(如上次使用的卡密)"""
CONFIG_FILE = "auth_config.json"
@classmethod
def load_config(cls) -> dict:
if os.path.exists(cls.CONFIG_FILE):
try:
with open(cls.CONFIG_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except:
pass
return {}
@classmethod
def save_last_key(cls, license_key: str):
"""保存最后一次成功的卡密"""
config = cls.load_config()
config['last_license_key'] = license_key
try:
with open(cls.CONFIG_FILE, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2)
except:
pass
@classmethod
def get_last_key(cls) -> str:
return cls.load_config().get('last_license_key', '')
class MachineCodeGenerator:
"""生成稳定的机器码"""
@staticmethod
def get() -> str:
cache_file = ".machine_id"
# 优先读取缓存保持不变
if os.path.exists(cache_file):
with open(cache_file, 'r') as f:
return f.read().strip()
hw_info = []
try:
# 1. 平台信息
hw_info.append(platform.node())
hw_info.append(platform.machine())
# 2. MAC地址
hw_info.append(str(uuid.getnode()))
# 3. Windows UUID (如果是Windows)
if platform.system() == "Windows":
try:
import subprocess
cmd = "wmic csproduct get uuid"
uuid_str = subprocess.check_output(cmd).decode().split('\n')[1].strip()
hw_info.append(uuid_str)
except:
pass
except:
hw_info.append(str(uuid.uuid4()))
# 生成Hash
combined = "|".join(hw_info)
code = hashlib.sha256(combined.encode()).hexdigest()[:32].upper()
# 写入缓存
try:
with open(cache_file, 'w') as f:
f.write(code)
except:
pass
return code
# ==========================================
# 2. 核心逻辑层 (验证与通信)
# ==========================================
class AuthCore:
"""处理验证逻辑不包含UI"""
def __init__(self, software_id, api_url, secret_key, timeout=5):
self.software_id = software_id
self.api_url = api_url.rstrip('/')
self.secret_key = secret_key
self.timeout = timeout
self.machine_code = MachineCodeGenerator.get()
self.token_file = f".auth_{software_id}.token"
def test_connection(self) -> Tuple[bool, str]:
"""测试服务器连接"""
try:
# 尝试访问一个简单的端点(如果存在)或直接测试连接
test_url = f"{self.api_url}/auth/verify"
# 发送一个简单的HEAD请求测试连接如果服务器支持
# 否则发送一个最小化的POST请求
test_data = {
"software_id": self.software_id,
"license_key": "TEST",
"machine_code": self.machine_code,
"timestamp": int(time.time()),
"signature": "test"
}
resp = requests.post(test_url, json=test_data, timeout=3)
# 即使返回错误,只要不是连接错误,说明服务器可达
return True, "服务器连接正常"
except requests.exceptions.Timeout:
return False, f"连接超时,服务器可能无响应: {self.api_url}"
except requests.exceptions.ConnectionError as e:
error_detail = str(e)
if "Name or service not known" in error_detail:
return False, f"无法解析服务器地址: {self.api_url}"
elif "Connection refused" in error_detail:
return False, f"服务器拒绝连接,请确认服务器是否运行: {self.api_url}"
else:
return False, f"无法连接到服务器: {self.api_url}"
except Exception as e:
return False, f"连接测试失败: {str(e)}"
def clear_token(self):
"""清除本地Token缓存"""
try:
if os.path.exists(self.token_file):
os.remove(self.token_file)
except:
pass
def verify_online(self, license_key: str) -> Tuple[bool, str, dict]:
"""在线验证"""
try:
# 生成时间戳
timestamp = int(time.time())
# 生成签名数据
signature_data = f"{self.software_id}{license_key}{self.machine_code}{timestamp}"
# 生成签名
combined = f"{signature_data}{self.secret_key}".encode('utf-8')
signature = hashlib.sha256(combined).hexdigest()
# 构建请求数据
request_data = {
"software_id": self.software_id,
"license_key": license_key,
"machine_code": self.machine_code,
"timestamp": timestamp,
"signature": signature,
"software_version": "1.0.0" # 可以后续从配置中读取
}
# 发送POST请求
verify_url = f"{self.api_url}/auth/verify"
try:
resp = requests.post(verify_url, json=request_data, timeout=self.timeout)
except requests.exceptions.Timeout:
return False, f"连接超时({self.timeout}秒),请检查网络连接或服务器地址: {self.api_url}", {}
except requests.exceptions.ConnectionError as e:
# 提供更详细的连接错误信息
error_detail = str(e)
if "Name or service not known" in error_detail or "nodename nor servname provided" in error_detail:
return False, f"无法解析服务器地址请检查API地址是否正确: {self.api_url}", {}
elif "Connection refused" in error_detail:
return False, f"服务器拒绝连接,请确认服务器是否运行在: {self.api_url}", {}
elif "No route to host" in error_detail:
return False, f"无法到达服务器,请检查网络连接: {self.api_url}", {}
else:
return False, f"无法连接到服务器 ({self.api_url}),请检查网络连接和服务器状态", {}
# 检查HTTP状态码
if resp.status_code != 200:
# 处理特定的HTTP状态码
if resp.status_code == 503:
return False, "服务器暂时不可用,请稍后重试", {}
elif resp.status_code == 500:
return False, "服务器内部错误,请联系管理员", {}
elif resp.status_code == 404:
return False, "API接口不存在请检查API地址", {}
elif resp.status_code == 401:
return False, "签名验证失败,请检查密钥配置", {}
else:
try:
error_data = resp.json()
error_msg = error_data.get('message', f'服务器返回错误: {resp.status_code}')
except:
error_msg = f'服务器返回错误: {resp.status_code}'
return False, error_msg, {}
# 解析响应
result = resp.json()
# 检查验证结果
if not result.get('success', False):
error_msg = result.get('message', '验证失败')
return False, error_msg, {}
# 验证成功,提取数据
data = result.get('data', {})
# 构建返回数据(兼容原有格式)
response_data = {
"expire_time": data.get('expire_time'),
"machine_code": self.machine_code,
"last_check": datetime.utcnow().isoformat(),
"license_key": data.get('license_key', license_key),
"type": data.get('type'),
"type_name": data.get('type_name', ''),
"remaining_days": data.get('remaining_days'),
"product_name": data.get('product_name', '')
}
return True, result.get('message', '验证成功'), response_data
except requests.exceptions.Timeout:
return False, "连接超时,请检查网络连接", {}
except requests.exceptions.ConnectionError as e:
return False, f"无法连接到服务器: {str(e)},请检查网络连接和服务器地址", {}
except requests.exceptions.RequestException as e:
return False, f"网络请求失败: {str(e)}", {}
except Exception as e:
return False, f"验证过程出错: {str(e)}", {}
def save_token(self, data: dict):
"""验证成功后保存Token"""
try:
with open(self.token_file, 'w') as f:
json.dump(data, f)
except:
pass
# ==========================================
# 3. 现代化 UI 层 (View)
# ==========================================
class AuthWindow(ctk.CTk):
"""现代化验证窗口"""
def __init__(self, auth_core: AuthCore):
super().__init__()
self.auth_core = auth_core
self.is_verified = False # 验证结果状态
self.auto_verify = False # 是否自动验证标志
self.is_destroyed = False # 窗口是否已销毁标志
self.pending_callbacks = [] # 待执行的after回调ID列表
self.verifying = False # 是否正在验证中
# 窗口基础设置
self.title("软件授权验证有问题联系Vtaiyi1224)")
self.geometry("300x350") # 增加高度以容纳服务器地址显示
self.resizable(False, False)
ctk.set_appearance_mode("Dark")
ctk.set_default_color_theme("blue")
# 绑定窗口关闭事件
self.protocol("WM_DELETE_WINDOW", self._on_closing)
# 居中显示
self._center_window()
self._setup_ui()
# 自动填入上次卡密,如果有则自动验证
self._load_history()
def _on_closing(self):
"""窗口关闭时的处理用户点击X关闭"""
# 如果用户手动关闭窗口,且验证未完成或正在验证中,则视为验证失败
if self.verifying or not self.is_verified:
self.is_verified = False
self.is_destroyed = True
self.verifying = False
# 取消所有pending的after回调
for callback_id in self.pending_callbacks:
try:
self.after_cancel(callback_id)
except:
pass
self.pending_callbacks.clear()
self.destroy()
def _center_window(self):
self.update_idletasks()
width = self.winfo_width()
height = self.winfo_height()
x = (self.winfo_screenwidth() // 2) - (width // 2)
y = (self.winfo_screenheight() // 2) - (height // 2)
self.geometry(f'{width}x{height}+{x}+{y}')
def _setup_ui(self):
# 1. 头部图标与标题
self.header = ctk.CTkFrame(self, fg_color="transparent")
self.header.pack(pady=(40, 20))
ctk.CTkLabel(self.header, text="🔐", font=("Segoe UI Emoji", 56)).pack()
ctk.CTkLabel(self.header, text="用户授权系统", font=("Microsoft YaHei UI", 22, "bold")).pack(pady=5)
# 2. 机器码显示区
self.mc_frame = ctk.CTkFrame(self, fg_color="#2B2B2B", corner_radius=8)
self.mc_frame.pack(padx=30, pady=10, fill="x")
ctk.CTkLabel(self.mc_frame, text="机器码 (点击复制):", font=("Microsoft YaHei UI", 12), text_color="gray").pack(
anchor="w", padx=15, pady=(10, 0))
self.mc_btn = ctk.CTkButton(
self.mc_frame,
text=self.auth_core.machine_code,
font=("Consolas", 11),
fg_color="transparent",
hover_color="#333333",
anchor="w",
command=self._copy_machine_code
)
self.mc_btn.pack(fill="x", padx=5, pady=(0, 10))
# 3. 卡密输入区
self.input_frame = ctk.CTkFrame(self, fg_color="transparent")
self.input_frame.pack(padx=30, pady=10, fill="x")
ctk.CTkLabel(self.input_frame, text="请输入激活码 / 卡密:", font=("Microsoft YaHei UI", 14)).pack(anchor="w",
pady=(0, 5))
self.entry_key = ctk.CTkEntry(
self.input_frame,
height=45,
placeholder_text="XXXXX-XXXXX-XXXXX-XXXXX",
font=("Consolas", 14),
border_width=2
)
self.entry_key.pack(fill="x")
# 4. 服务器地址显示(小字,灰色)
self.lbl_server = ctk.CTkLabel(
self,
text=f"服务器: {self.auth_core.api_url}",
text_color="#666",
font=("Microsoft YaHei UI", 9)
)
self.lbl_server.pack(pady=(10, 0))
# 5. 状态提示信息(支持多行)
self.lbl_status = ctk.CTkLabel(
self,
text="等待验证...",
text_color="gray",
font=("Microsoft YaHei UI", 12),
wraplength=360, # 允许自动换行
justify="left"
)
self.lbl_status.pack(pady=(10, 5))
# 6. 验证按钮
self.btn_verify = ctk.CTkButton(
self,
text="立即验证授权",
height=50,
font=("Microsoft YaHei UI", 16, "bold"),
command=self._handle_verify
)
self.btn_verify.pack(padx=30, pady=20, fill="x")
# 底部版权
ctk.CTkLabel(self, text="Powered by AuthValidator", font=("Arial", 10), text_color="#444").pack(side="bottom",
pady=10)
def _load_history(self):
"""读取历史卡密,如果有则自动验证"""
last_key = ConfigManager.get_last_key()
if last_key:
self.entry_key.insert(0, last_key)
self.lbl_status.configure(text="已自动填入上次卡密,正在验证中...", text_color="#2196F3")
# 延迟100ms后自动触发验证确保UI已完全加载
callback_id = self.after(100, self._safe_auto_verify)
self.pending_callbacks.append(callback_id)
else:
self.lbl_status.configure(text="请输入卡密并点击验证", text_color="gray")
def _safe_auto_verify(self):
"""安全地自动验证保存的卡密"""
if self.is_destroyed:
return
key = self.entry_key.get().strip()
if key:
self.auto_verify = True
self._handle_verify()
def _copy_machine_code(self):
if self.is_destroyed:
return
self.clipboard_clear()
self.clipboard_append(self.auth_core.machine_code)
self.lbl_status.configure(text="✅ 机器码已复制到剪贴板", text_color="#4CAF50")
callback_id = self.after(2000, self._safe_reset_status)
self.pending_callbacks.append(callback_id)
def _safe_reset_status(self):
"""安全地重置状态提示"""
if not self.is_destroyed:
self.lbl_status.configure(text="等待验证...", text_color="gray")
def _handle_verify(self):
key = self.entry_key.get().strip()
if not key:
self.lbl_status.configure(text="❌ 卡密不能为空", text_color="#F44336")
return
# 如果正在验证中,忽略重复请求
if self.verifying:
return
# 标记为正在验证
self.verifying = True
self.is_verified = False # 重置验证状态
# 锁定UI
self.btn_verify.configure(state="disabled", text="正在连接服务器...")
self.entry_key.configure(state="disabled")
self.lbl_status.configure(text="⏳ 正在验证中,请稍候...", text_color="#2196F3")
# 开启线程进行验证
threading.Thread(target=self._verify_thread, args=(key,), daemon=True).start()
def _verify_thread(self, key):
"""后台验证逻辑"""
success, msg, data = self.auth_core.verify_online(key)
# 回到主线程更新UI使用安全的方式
if not self.is_destroyed:
callback_id = self.after(0, lambda: self._on_verify_result(success, msg, key, data))
self.pending_callbacks.append(callback_id)
def _on_verify_result(self, success, msg, key, data):
# 如果窗口已销毁,直接返回
if self.is_destroyed:
return
# 标记验证完成
self.verifying = False
self.btn_verify.configure(state="normal", text="立即验证授权")
self.entry_key.configure(state="normal")
if success:
# 验证成功 - 必须先设置 is_verified再关闭窗口
self.is_verified = True
self.lbl_status.configure(text=f"{msg}", text_color="#4CAF50")
# 保存卡密和Token
ConfigManager.save_last_key(key)
self.auth_core.save_token(data)
# 如果是自动验证延迟关闭窗口如果是手动验证给用户1秒查看结果后关闭
delay = 800 if self.auto_verify else 1000
callback_id = self.after(delay, self._safe_close_window)
self.pending_callbacks.append(callback_id)
else:
# 验证失败,确保 is_verified 为 False
self.is_verified = False
# 清除本地缓存
self.auth_core.clear_token()
# 格式化错误消息如果是连接错误显示API地址
error_msg = msg
if "无法连接" in msg or "连接超时" in msg or "服务器" in msg:
# 在错误消息中已经包含了API地址直接显示
pass
elif len(msg) > 80:
# 如果错误消息太长,截断并添加提示
error_msg = msg[:80] + "..."
# 如果是自动验证失败,允许用户修改卡密后重新验证
if self.auto_verify:
self.lbl_status.configure(text=f"{error_msg}\n请检查网络连接或重新输入卡密", text_color="#F44336")
self.auto_verify = False # 重置标志,允许手动验证
else:
self.lbl_status.configure(text=f"{error_msg}", text_color="#F44336")
def _safe_close_window(self):
"""安全地关闭窗口"""
if not self.is_destroyed:
self.is_destroyed = True
# 取消所有pending的after回调
for callback_id in self.pending_callbacks:
try:
self.after_cancel(callback_id)
except:
pass
self.pending_callbacks.clear()
self.destroy()
# ==========================================
# 4. 统一接口层 (Facade)
# ==========================================
class AuthValidator:
"""
授权验证器主类
保持与旧版一致的调用接口
"""
def __init__(self,
software_id: str,
api_url: str = "http://localhost:5000/api/v1",
secret_key: str = "default_secret",
**kwargs):
"""
初始化验证器
Args:
software_id: 软件ID
api_url: API地址
secret_key: 密钥
"""
self.core = AuthCore(software_id, api_url, secret_key)
def validate(self) -> bool:
"""
执行验证流程 (阻塞式)
每次打开都必须进行在线验证,防止后台禁用卡密后用户仍能使用
1. 读取保存的卡密(如果有)
2. 弹出现代化UI窗口并自动验证
3. 验证失败则要求用户重新输入
4. 验证成功后恢复tkinter的原始缩放比例
Returns:
bool: 是否验证成功
"""
# 启动 UI 窗口(会自动读取保存的卡密并验证)
app = AuthWindow(self.core)
# 运行主循环 (这会阻塞代码执行,直到窗口关闭)
app.mainloop()
# 窗口关闭后,如果验证成功,重置 tkinter 的缩放比例
if app.is_verified:
self._reset_tk_scaling()
# 窗口关闭后,检查是否验证成功
return app.is_verified
def _reset_tk_scaling(self):
"""
重置 tkinter 的 DPI 缩放,恢复到系统默认值
因为 customtkinter 会修改全局的 tkinter 缩放设置
这样可以确保后续创建的 tkinter 窗口使用正常的尺寸
"""
try:
# 创建一个临时的 tkinter 根窗口来重置缩放
temp_root = tk.Tk()
temp_root.withdraw() # 隐藏窗口
# 重置缩放因子为系统默认值(通常是 1.0
# 这会影响后续所有 tkinter 窗口的尺寸计算
temp_root.tk.call('tk', 'scaling', 1.0)
# 销毁临时窗口
temp_root.destroy()
except Exception as e:
# 如果重置失败,静默处理,不影响主流程
pass
# --- END OF FILE ---