Kamixitong/app/api/user.py
2025-11-19 22:49:24 +08:00

618 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import request, jsonify, current_app
from app import db
from app.models import Product, License, Ticket, Version
from . import api_bp
from app.utils.logger import log_operation
import re
# ==================== 产品相关接口 ====================
@api_bp.route('/user/products', methods=['GET'])
def get_user_products():
"""用户端获取产品列表(无需认证)"""
try:
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 20, type=int), 100)
keyword = request.args.get('keyword', '').strip()
product_type = request.args.get('type') # 产品类型筛选
is_paid = request.args.get('is_paid', type=int) # 付费筛选1=付费0=免费
query = Product.query.filter_by(status=1) # 只显示启用的产品
# 关键词搜索
if keyword:
query = query.filter(
db.or_(
Product.product_name.like(f'%{keyword}%'),
Product.description.like(f'%{keyword}%')
)
)
# 产品类型筛选
if product_type:
query = query.filter(Product.product_type == product_type)
query = query.order_by(Product.create_time.desc())
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
products = []
for product in pagination.items:
product_dict = product.to_dict()
# 获取最新版本信息
latest_version = Version.query.filter_by(
product_id=product.product_id,
publish_status=1
).order_by(Version.create_time.desc()).first()
product_dict['latest_version'] = latest_version.version_num if latest_version else None
# 移除不存在的is_paid字段
products.append(product_dict)
return jsonify({
'success': True,
'data': {
'products': products,
'pagination': {
'page': page,
'per_page': per_page,
'total': pagination.total,
'pages': pagination.pages,
'has_prev': pagination.has_prev,
'has_next': pagination.has_next
}
}
})
except Exception as e:
current_app.logger.error(f"获取产品列表失败: {str(e)}")
return jsonify({
'success': False,
'message': '服务器内部错误,请稍后重试'
}), 500
@api_bp.route('/user/products/<product_id>', methods=['GET'])
def get_user_product(product_id):
"""用户端获取产品详情(无需认证)"""
try:
product = Product.query.filter_by(product_id=product_id, status=1).first()
if not product:
return jsonify({
'success': False,
'message': '产品不存在或已下架'
}), 404
product_dict = product.to_dict(include_stats=True)
# 获取最新版本信息
latest_version = Version.query.filter_by(
product_id=product_id,
publish_status=1
).order_by(Version.create_time.desc()).first()
if latest_version:
product_dict['latest_version'] = {
'version_num': latest_version.version_num,
'update_time': latest_version.create_time.isoformat(),
'update_log': latest_version.update_log
}
# 获取最近3条更新日志
recent_versions = Version.query.filter_by(
product_id=product_id,
publish_status=1
).order_by(Version.create_time.desc()).limit(3).all()
product_dict['recent_updates'] = [
{
'version_num': v.version_num,
'update_time': v.create_time.isoformat(),
'update_log': v.update_log
} for v in recent_versions
]
# 移除不存在的is_paid字段
return jsonify({
'success': True,
'data': product_dict
})
except Exception as e:
current_app.logger.error(f"获取产品详情失败: {str(e)}")
return jsonify({
'success': False,
'message': '服务器内部错误'
}), 500
# ==================== 卡密相关接口 ====================
@api_bp.route('/user/licenses/packages', methods=['GET'])
def get_license_packages():
"""用户端获取卡密套餐(按产品筛选,无需认证)"""
try:
product_id = request.args.get('product_id')
if not product_id:
return jsonify({
'success': False,
'message': '缺少产品ID参数'
}), 400
# 验证产品是否存在且启用
product = Product.query.filter_by(product_id=product_id, status=1).first()
if not product:
return jsonify({
'success': False,
'message': '产品不存在或已下架'
}), 404
# 获取该产品的付费套餐信息(这里简化处理,实际应从套餐表获取)
# 假设我们有一些预设的套餐,使用与后台卡密管理一致的分类
# 从License模型中获取时长类型
from app.models.license import License
# 定义一个简单的函数来获取时长类型,避免实例化问题
def get_duration_type(days):
if days == -1:
return '永久卡'
elif days == 1:
return '天卡'
elif days == 30:
return '月卡'
elif days == 90:
return '季卡'
elif days == 365:
return '年卡'
else:
return f'{days}天卡'
packages = [
{
'package_id': f'{product_id}_year',
'name': '年卡授权',
'price': 299.00,
'duration': 365,
'duration_text': get_duration_type(365),
'description': '一年期正版授权'
},
{
'package_id': f'{product_id}_season',
'name': '季卡授权',
'price': 99.00,
'duration': 90,
'duration_text': get_duration_type(90),
'description': '一季度正版授权'
},
{
'package_id': f'{product_id}_month',
'name': '月卡授权',
'price': 39.90,
'duration': 30,
'duration_text': get_duration_type(30),
'description': '一个月正版授权'
},
{
'package_id': f'{product_id}_day',
'name': '天卡授权',
'price': 5.00,
'duration': 1,
'duration_text': get_duration_type(1),
'description': '一天期正版授权'
}
]
return jsonify({
'success': True,
'data': {
'product': product.to_dict(),
'packages': packages
}
})
except Exception as e:
current_app.logger.error(f"获取卡密套餐失败: {str(e)}")
return jsonify({
'success': False,
'message': '服务器内部错误'
}), 500
@api_bp.route('/user/licenses/verify', methods=['GET'])
def user_verify_license():
"""用户端验证卡密(下载用,无需认证)"""
try:
license_key = request.args.get('license_key')
product_id = request.args.get('product_id')
if not all([license_key, product_id]):
return jsonify({
'success': False,
'message': '缺少必要参数'
}), 400
# 查找卡密
license_obj = License.query.filter_by(
license_key=license_key,
product_id=product_id
).first()
if not license_obj:
return jsonify({
'success': False,
'message': '卡密不存在'
}), 404
# 检查卡密状态
if license_obj.status != 1:
return jsonify({
'success': False,
'message': '卡密未激活或已失效'
}), 403
# 检查是否过期
if license_obj.is_expired():
return jsonify({
'success': False,
'message': '卡密已过期'
}), 403
# 获取产品信息
product = Product.query.filter_by(product_id=product_id, status=1).first()
if not product:
return jsonify({
'success': False,
'message': '产品不存在或已下架'
}), 404
# 获取下载链接(从最新版本获取)
latest_version = Version.query.filter_by(
product_id=product_id,
publish_status=1
).order_by(Version.create_time.desc()).first()
return jsonify({
'success': True,
'message': '卡密验证成功',
'data': {
'license_key': license_obj.license_key,
'product_name': product.product_name,
'expire_time': license_obj.expire_time.isoformat() if license_obj.expire_time else None,
'download_url': latest_version.download_url if latest_version else None,
'can_download': True
}
})
except Exception as e:
current_app.logger.error(f"卡密验证失败: {str(e)}")
return jsonify({
'success': False,
'message': '服务器内部错误'
}), 500
# ==================== 工单相关接口 ====================
@api_bp.route('/user/tickets', methods=['POST'])
def create_user_ticket():
"""用户端创建工单(无需认证)"""
try:
data = request.get_json()
if not data:
return jsonify({'success': False, 'message': '请求数据为空'}), 400
product_id = data.get('product_id')
contact_person = data.get('contact_person', '').strip()
phone = data.get('phone', '').strip()
title = data.get('title', '').strip()
description = data.get('description', '').strip()
attachment = data.get('attachment') # 附件信息
# 验证必填字段
if not all([product_id, contact_person, phone, title, description]):
return jsonify({'success': False, 'message': '缺少必要参数'}), 400
# 验证手机号格式
if not phone or not re.match(r'^1[3-9]\d{9}$', phone):
return jsonify({'success': False, 'message': '手机号格式不正确'}), 400
# 验证产品存在且启用
product = Product.query.filter_by(product_id=product_id, status=1).first()
if not product:
return jsonify({'success': False, 'message': '产品不存在或已下架'}), 404
ticket = Ticket(
title=title,
product_id=product_id,
description=description,
contact_person=contact_person,
phone=phone,
attachment=attachment,
priority=data.get('priority', 1),
source=1 # 用户提交的工单
)
db.session.add(ticket)
db.session.commit()
# 记录操作日志
log_operation('CREATE_USER_TICKET', 'TICKET', ticket.ticket_id, {
'title': ticket.title,
'product_id': ticket.product_id,
'contact_person': contact_person,
'phone': phone
})
return jsonify({
'success': True,
'message': '工单提交成功',
'data': {
'ticket_id': ticket.ticket_id,
'ticket_number': ticket.ticket_number
}
})
except Exception as e:
db.session.rollback()
current_app.logger.error(f"创建工单失败: {str(e)}")
return jsonify({'success': False, 'message': '服务器内部错误'}), 500
@api_bp.route('/user/tickets', methods=['GET'])
def get_user_tickets():
"""用户端获取工单列表(通过手机号查询,无需认证)"""
try:
phone = request.args.get('phone')
if not phone:
return jsonify({
'success': False,
'message': '缺少手机号参数'
}), 400
# 验证手机号格式
if not re.match(r'^1[3-9]\d{9}$', phone):
return jsonify({
'success': False,
'message': '手机号格式不正确'
}), 400
# 查询该手机号提交的工单
tickets = Ticket.query.filter_by(phone=phone).order_by(Ticket.create_time.desc()).all()
return jsonify({
'success': True,
'data': [ticket.to_dict() for ticket in tickets]
})
except Exception as e:
current_app.logger.error(f"获取工单列表失败: {str(e)}")
return jsonify({
'success': False,
'message': '服务器内部错误'
}), 500
@api_bp.route('/user/tickets/query', methods=['GET'])
def query_user_ticket():
"""用户端查询工单(通过工单编号+手机号,无需认证)"""
try:
ticket_number = request.args.get('ticket_number')
phone = request.args.get('phone')
if not all([ticket_number, phone]):
return jsonify({
'success': False,
'message': '缺少必要参数'
}), 400
# 验证手机号格式
if not phone or not re.match(r'^1[3-9]\d{9}$', phone):
return jsonify({
'success': False,
'message': '手机号格式不正确'
}), 400
# 查找工单
ticket = Ticket.query.filter_by(
ticket_number=ticket_number,
phone=phone
).first()
if not ticket:
return jsonify({
'success': False,
'message': '工单不存在或手机号不匹配'
}), 404
return jsonify({
'success': True,
'data': ticket.to_dict()
})
except Exception as e:
current_app.logger.error(f"查询工单失败: {str(e)}")
return jsonify({
'success': False,
'message': '服务器内部错误'
}), 500
# ==================== 订单相关接口 ====================
@api_bp.route('/user/orders', methods=['POST'])
def create_user_order():
"""用户端创建订单(无需认证)"""
try:
data = request.get_json()
if not data:
return jsonify({
'success': False,
'message': '请求数据为空'
}), 400
product_id = data.get('product_id')
package_id = data.get('package_id')
contact_person = data.get('contact_person', '').strip()
phone = data.get('phone', '').strip()
quantity = data.get('quantity', 1)
# 验证必填字段
if not all([product_id, package_id, contact_person, phone]):
return jsonify({
'success': False,
'message': '缺少必要参数'
}), 400
# 验证手机号格式
if not phone or not re.match(r'^1[3-9]\d{9}$', phone):
return jsonify({
'success': False,
'message': '手机号格式不正确'
}), 400
# 验证数量
if not isinstance(quantity, int) or quantity < 1 or quantity > 5:
return jsonify({
'success': False,
'message': '购买数量必须在1-5之间'
}), 400
# 验证产品存在且启用
product = Product.query.filter_by(product_id=product_id, status=1).first()
if not product:
return jsonify({
'success': False,
'message': '产品不存在或已下架'
}), 404
# 生成订单号(简化处理,实际应有更复杂的订单号生成规则)
import time
import random
order_number = f"ORD{int(time.time())}{random.randint(100, 999)}"
# 简化处理:直接返回支付链接(实际应对接支付系统)
# 这里模拟支付链接生成
payment_url = f"/payment?order_number={order_number}&amount=0.01" # 示例金额
# 记录操作日志
log_operation('CREATE_USER_ORDER', 'ORDER', None, {
'order_number': order_number,
'product_id': product_id,
'package_id': package_id,
'contact_person': contact_person,
'phone': phone,
'quantity': quantity
})
return jsonify({
'success': True,
'message': '订单创建成功',
'data': {
'order_number': order_number,
'payment_url': payment_url,
'amount': 0.01 # 示例金额
}
})
except Exception as e:
current_app.logger.error(f"创建订单失败: {str(e)}")
return jsonify({
'success': False,
'message': '服务器内部错误'
}), 500
@api_bp.route('/user/orders/query', methods=['GET'])
def query_user_order():
"""用户端查询订单(通过手机号+订单号,无需认证)"""
try:
order_number = request.args.get('order_number')
phone = request.args.get('phone')
if not all([order_number, phone]):
return jsonify({
'success': False,
'message': '缺少必要参数'
}), 400
# 验证手机号格式
if not phone or not re.match(r'^1[3-9]\d{9}$', phone):
return jsonify({
'success': False,
'message': '手机号格式不正确'
}), 400
# 这里简化处理,实际应查询订单表
# 模拟返回订单信息
order_info = {
'order_number': order_number,
'status': 1, # 1=已支付
'status_name': '已支付',
'amount': 0.01,
'created_time': '2023-01-01T00:00:00',
'payment_time': '2023-01-01T00:05:00'
}
return jsonify({
'success': True,
'data': order_info
})
except Exception as e:
current_app.logger.error(f"查询订单失败: {str(e)}")
return jsonify({
'success': False,
'message': '服务器内部错误'
}), 500
# ==================== 下载相关接口 ====================
@api_bp.route('/user/downloads/check', methods=['GET'])
def check_download_permission():
"""用户端检查下载权限(无需认证)"""
try:
product_id = request.args.get('product_id')
if not product_id:
return jsonify({
'success': False,
'message': '缺少产品ID参数'
}), 400
# 验证产品存在且启用
product = Product.query.filter_by(product_id=product_id, status=1).first()
if not product:
return jsonify({
'success': False,
'message': '产品不存在或已下架'
}), 404
# 获取产品是否为付费产品
is_paid = getattr(product, 'is_paid', False)
# 获取最新版本信息
latest_version = Version.query.filter_by(
product_id=product_id,
publish_status=1
).order_by(Version.create_time.desc()).first()
return jsonify({
'success': True,
'data': {
'product_id': product_id,
'product_name': product.product_name,
'is_paid': is_paid,
'latest_version': latest_version.version_num if latest_version else None,
'download_url': latest_version.download_url if latest_version else None
}
})
except Exception as e:
current_app.logger.error(f"检查下载权限失败: {str(e)}")
return jsonify({
'success': False,
'message': '服务器内部错误'
}), 500