Kamixitong/app/utils/auth_validator.py
2025-11-16 13:33:32 +08:00

461 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Python软件授权验证器
一款轻量级的软件授权验证模块,支持在线/离线验证模式
使用方法:
from auth_validator import AuthValidator
validator = AuthValidator(software_id="your_software_id")
if not validator.validate():
exit() # 验证失败退出程序
"""
import os
import json
import time
import hashlib
import requests
from datetime import datetime, timedelta
from typing import Optional, Tuple, Dict, Any
# 内置机器码生成器独立版本不依赖app.utils
class MachineCodeGenerator:
"""独立版本的机器码生成器"""
@staticmethod
def generate() -> str:
"""生成32位机器码"""
import platform
import uuid
import subprocess
hw_info = []
try:
# 获取系统信息
system = platform.system().lower()
# 系统UUID
try:
system_uuid = str(uuid.getnode())
if system_uuid and system_uuid != '0':
hw_info.append(system_uuid)
except:
pass
# 主机名
try:
hostname = platform.node()
if hostname:
hw_info.append(hostname)
except:
pass
# 系统信息
try:
system_info = f"{system}_{platform.release()}_{platform.machine()}"
hw_info.append(system_info)
except:
pass
# Python版本作为备用
try:
python_version = platform.python_version()
hw_info.append(python_version)
except:
pass
except Exception as e:
print(f"获取硬件信息时出错: {e}")
# 如果没有获取到任何信息使用随机UUID
if not hw_info:
hw_info = [str(uuid.uuid4()), str(uuid.uuid4())]
# 生成哈希
combined_info = '|'.join(hw_info)
hash_obj = hashlib.sha256(combined_info.encode('utf-8'))
machine_code = hash_obj.hexdigest()[:32].upper()
return machine_code
# 简化的加密工具
class SimpleCrypto:
"""简单的加密解密工具"""
@staticmethod
def generate_hash(data: str, salt: str = "") -> str:
"""生成哈希值"""
combined = f"{data}{salt}".encode('utf-8')
return hashlib.sha256(combined).hexdigest()
@staticmethod
def generate_signature(data: str, secret_key: str) -> str:
"""生成签名"""
combined = f"{data}{secret_key}".encode('utf-8')
return hashlib.sha256(combined).hexdigest()
class AuthCache:
"""授权信息缓存管理"""
def __init__(self, cache_file: str = ".auth_cache"):
self.cache_file = cache_file
self.cache_data = self._load_cache()
def _load_cache(self) -> Dict[str, Any]:
"""加载缓存"""
try:
if os.path.exists(self.cache_file):
with open(self.cache_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception:
pass
return {}
def _save_cache(self):
"""保存缓存"""
try:
with open(self.cache_file, 'w', encoding='utf-8') as f:
json.dump(self.cache_data, f, ensure_ascii=False, indent=2)
except Exception:
pass
def get_auth_info(self, software_id: str) -> Optional[Dict[str, Any]]:
"""获取授权信息"""
key = f"auth_{software_id}"
return self.cache_data.get(key)
def set_auth_info(self, software_id: str, auth_info: Dict[str, Any]):
"""设置授权信息"""
key = f"auth_{software_id}"
self.cache_data[key] = auth_info
self._save_cache()
def clear_cache(self, software_id = None):
"""清除缓存"""
if software_id:
key = f"auth_{software_id}"
self.cache_data.pop(key, None)
else:
self.cache_data.clear()
self._save_cache()
class AuthValidator:
"""授权验证器主类"""
def __init__(self,
software_id: str,
api_url: str = "http://localhost:5000/api/v1",
secret_key: str = "taiyi1224",
cache_days: int = 7,
timeout: int = 3,
gui_mode: bool = False):
"""
初始化验证器
Args:
software_id: 软件ID在后台创建产品时生成
api_url: 后台API地址
secret_key: 验证密钥
cache_days: 离线缓存有效期(天)
timeout: 网络请求超时时间(秒)
gui_mode: 是否使用图形界面输入卡密
"""
self.software_id = software_id
self.api_url = api_url.rstrip('/')
self.secret_key = secret_key
self.cache_days = cache_days
self.timeout = timeout
self.gui_mode = gui_mode
# 初始化组件
self.machine_generator = MachineCodeGenerator()
self.cache = AuthCache()
self.machine_code = self._get_or_generate_machine_code()
# 验证状态
self.failed_attempts = 0
self.last_attempt_time = None
self.locked_until = None
def _get_or_generate_machine_code(self) -> str:
"""获取或生成机器码"""
cache_file = ".machine_code"
# 尝试从缓存加载
try:
if os.path.exists(cache_file):
with open(cache_file, 'r') as f:
cached_code = f.read().strip()
if len(cached_code) == 32:
return cached_code
except Exception:
pass
# 生成新的机器码
machine_code = self.machine_generator.generate()
# 保存到缓存
try:
with open(cache_file, 'w') as f:
f.write(machine_code)
except Exception:
pass
return machine_code
def _is_locked(self) -> bool:
"""检查是否被锁定"""
if not self.locked_until:
return False
if datetime.utcnow() > self.locked_until:
self.locked_until = None
self.failed_attempts = 0
return False
return True
def _lock_account(self, minutes: int = 10):
"""锁定账号"""
self.locked_until = datetime.utcnow() + timedelta(minutes=minutes)
def _get_lock_message(self) -> str:
"""获取锁定消息"""
if self.locked_until:
remaining_minutes = int((self.locked_until - datetime.utcnow()).total_seconds() / 60)
return f"验证失败次数过多,请等待 {remaining_minutes} 分钟后再试"
return ""
def _validate_license_format(self, license_key: str) -> bool:
"""验证卡密格式(与服务端保持一致)"""
if not license_key:
return False
# 去除空格和制表符,并转为大写
license_key = license_key.strip().replace(' ', '').replace('\t', '').upper()
# 检查是否为XXXX-XXXX-XXXX-XXXX格式
if '-' in license_key:
parts = license_key.split('-')
# 应该有4部分每部分8个字符
if len(parts) == 4 and all(len(part) == 8 for part in parts):
# 检查所有字符是否为大写字母或数字
combined = ''.join(parts)
if len(combined) == 32:
pattern = r'^[A-Z0-9]+$'
import re
return bool(re.match(pattern, combined))
return False
else:
# 兼容旧格式检查长度16-32位
if len(license_key) < 16 or len(license_key) > 32:
return False
# 检查字符(只允许大写字母、数字和下划线)
pattern = r'^[A-Z0-9_]+$'
import re
return bool(re.match(pattern, license_key))
def _input_license_key(self) -> str:
"""输入卡密"""
if self.gui_mode:
try:
import tkinter as tk
from tkinter import simpledialog, messagebox
root = tk.Tk()
root.withdraw() # 隐藏主窗口
title = "软件授权验证"
prompt = f"软件ID: {self.software_id}\n机器码: {self.machine_code[:8]}...\n\n请输入您的卡密:"
license_key = simpledialog.askstring(title, prompt)
if not license_key:
return ""
root.destroy()
return license_key.strip()
except ImportError:
# 如果tkinter不可用使用命令行输入
pass
# 命令行输入
print(f"\n=== 软件授权验证 ===")
print(f"软件ID: {self.software_id}")
print(f"机器码: {self.machine_code[:8]}...{self.machine_code[-8:]}")
print(f"提示: 试用卡密以 'TRIAL_' 开头")
while True:
license_key = input("请输入卡密: ").strip()
if license_key:
return license_key
print("卡密不能为空,请重新输入。")
def _show_message(self, title: str, message: str, is_error: bool = False):
"""显示消息"""
if self.gui_mode:
try:
import tkinter as tk
from tkinter import messagebox
root = tk.Tk()
root.withdraw()
messagebox.showerror(title, message) if is_error else messagebox.showinfo(title, message)
root.destroy()
return
except ImportError:
pass
# 命令行输出
print(f"\n=== {title} ===")
print(message)
def _online_verify(self, license_key: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]:
"""在线验证"""
try:
url = f"{self.api_url}/auth/verify"
# 准备请求数据
# 使用与服务端一致的时间戳生成方式
timestamp = int(datetime.utcnow().timestamp())
data = {
"software_id": self.software_id,
"license_key": license_key,
"machine_code": self.machine_code,
"timestamp": timestamp
}
# 生成验证签名
signature_data = f"{data['software_id']}{data['license_key']}{data['machine_code']}{data['timestamp']}"
signature = SimpleCrypto.generate_signature(signature_data, self.secret_key)
data["signature"] = signature
# 发送请求
response = requests.post(
url,
json=data,
timeout=self.timeout,
headers={'Content-Type': 'application/json'}
)
if response.status_code == 200:
result = response.json()
if result.get('success'):
auth_info = result.get('data', {})
return True, "验证成功", auth_info
else:
return False, result.get('message', '验证失败'), None
else:
return False, f"网络请求失败: {response.status_code}", None
except requests.exceptions.Timeout:
return False, "网络超时,请检查网络连接", None
except requests.exceptions.ConnectionError:
return False, "无法连接到服务器", None
except Exception as e:
return False, f"验证过程出错: {str(e)}", None
def _offline_verify(self) -> Tuple[bool, str]:
"""离线验证"""
auth_info = self.cache.get_auth_info(self.software_id)
if not auth_info:
return False, "未找到有效的离线授权信息,请联网验证"
# 检查缓存时间
cache_time = datetime.fromisoformat(auth_info.get('cache_time', ''))
if datetime.utcnow() - cache_time > timedelta(days=self.cache_days):
return False, f"离线授权已过期(超过{self.cache_days}天),请联网验证"
# 检查机器码匹配
if auth_info.get('machine_code') != self.machine_code:
return False, "设备信息不匹配,请重新验证"
# 检查有效期
expire_time = auth_info.get('expire_time')
if expire_time:
expire_datetime = datetime.fromisoformat(expire_time)
if datetime.utcnow() > expire_datetime:
return False, "授权已过期,请重新验证"
return True, "离线验证成功"
def _cache_auth_info(self, auth_info: Dict[str, Any]):
"""缓存授权信息"""
auth_info['cache_time'] = datetime.utcnow().isoformat()
self.cache.set_auth_info(self.software_id, auth_info)
def validate(self) -> bool:
"""
执行验证流程
Returns:
bool: 验证是否成功
"""
# 检查锁定状态
if self._is_locked():
self._show_message("验证锁定", self._get_lock_message(), True)
return False
# 首先尝试离线验证
offline_success, offline_message = self._offline_verify()
if offline_success:
return True
# 离线验证失败,尝试在线验证
max_attempts = 3 # 最多尝试3次
for attempt in range(max_attempts):
# 获取卡密
license_key = self._input_license_key()
if not license_key:
self._show_message("验证取消", "用户取消了验证操作")
return False
# 验证卡密格式
if not self._validate_license_format(license_key):
self._show_message("格式错误", "卡密格式不正确,请重新输入", True)
continue
# 在线验证
success, message, auth_info = self._online_verify(license_key)
if success:
# 缓存授权信息
if auth_info:
self._cache_auth_info(auth_info)
self._show_message("验证成功", message)
return True
else:
# 记录失败尝试
self.failed_attempts += 1
if self.failed_attempts >= 3:
self._lock_account(10) # 锁定10分钟
self._show_message("验证失败", self._get_lock_message(), True)
return False
else:
remaining_attempts = 3 - self.failed_attempts
self._show_message(
"验证失败",
f"{message}\n\n剩余尝试次数: {remaining_attempts}",
True
)
continue
# 所有尝试都失败
self._show_message("验证失败", "已达到最大尝试次数", True)
return False
def get_auth_info(self) -> Optional[Dict[str, Any]]:
"""
获取当前授权信息
Returns:
Optional[Dict[str, Any]]: 授权信息
"""
return self.cache.get_auth_info(self.software_id)
def clear_auth_cache(self):
"""清除授权缓存"""
self.cache.clear_cache(self.software_id)