diff --git a/api_test.html b/api_test.html new file mode 100644 index 0000000..d15b205 --- /dev/null +++ b/api_test.html @@ -0,0 +1,604 @@ + + + + + + KaMiXiTong API测试平台 + + + +
+

KaMiXiTong API测试平台

+

这是一个用于测试KaMiXiTong系统所有API接口的前端页面。

+ +
+
用户管理
+
工单管理
+
卡密管理
+
版本管理
+
设备管理
+
产品管理
+
+ + +
+
+

创建管理员

+
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+ +
+
+
+ +
+

获取管理员列表

+ +
+
+ +
+

更新管理员

+
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+ +
+
+
+
+ + +
+
+

创建工单

+
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+ +
+
+
+ +
+

获取工单列表

+ +
+
+
+ + +
+
+

生成卡密

+
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+ +
+
+
+ +
+

获取卡密列表

+ +
+
+
+ + +
+
+

创建版本

+
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+ +
+
+
+ +
+

获取版本列表

+ +
+
+
+ + +
+
+

获取设备列表

+ +
+
+
+ + +
+
+

创建产品

+
+
+ + +
+
+ + +
+ +
+
+
+ +
+

获取产品列表

+ +
+
+
+
+ + + + \ No newline at end of file diff --git a/api_test_app.py b/api_test_app.py new file mode 100644 index 0000000..2f86517 --- /dev/null +++ b/api_test_app.py @@ -0,0 +1,821 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +FastAPI接口测试应用 +提供所有管理功能的API接口测试页面 +""" + +import os +import sys +from datetime import datetime, timedelta +from typing import List, Optional +from pydantic import BaseModel + +from fastapi import FastAPI, HTTPException, Depends, status +from fastapi.middleware.cors import CORSMiddleware +from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, Boolean, ForeignKey, func +from sqlalchemy.orm import declarative_base, sessionmaker, Session + +# 添加项目根目录到Python路径 +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +# 导入配置 +from config import Config + +# 数据库配置 +DATABASE_URL = Config.SQLALCHEMY_DATABASE_URI +engine = create_engine(DATABASE_URL) +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) +Base = declarative_base() + +# 创建FastAPI应用 +app = FastAPI( + title="KaMiXiTong API测试平台", + description="软件授权管理系统的完整API接口测试平台", + version="1.0.0", + docs_url="/docs", + redoc_url="/redoc" +) + +# 添加CORS中间件 +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# 依赖项 +def get_db(): + db = SessionLocal() + try: + yield db + finally: + db.close() + +# ==================== 用户管理模型 ==================== +class AdminBase(BaseModel): + username: str + email: Optional[str] = None + role: Optional[int] = 0 # 0=普通管理员, 1=超级管理员 + status: Optional[int] = 1 # 0=禁用, 1=正常 + + class Config: + from_attributes = True + +class AdminCreate(AdminBase): + password: str + +class AdminUpdate(AdminBase): + password: Optional[str] = None + +class AdminInDB(AdminBase): + admin_id: int + create_time: datetime + update_time: datetime + +# ==================== 工单管理模型 ==================== +class TicketBase(BaseModel): + title: str + product_id: str + description: str + priority: Optional[int] = 1 # 1=低, 2=中, 3=高 + status: Optional[int] = 0 # 0=待处理, 1=处理中, 2=已解决, 3=已关闭 + + class Config: + from_attributes = True + +class TicketCreate(TicketBase): + software_version: Optional[str] = None + machine_code: Optional[str] = None + license_key: Optional[str] = None + +class TicketUpdate(TicketBase): + pass + +class TicketInDB(TicketBase): + ticket_id: int + create_time: datetime + update_time: datetime + +# ==================== 卡密管理模型 ==================== +class LicenseBase(BaseModel): + product_id: str + type: int = 1 # 0=试用, 1=正式 + status: Optional[int] = 0 # 0=未使用, 1=已使用, 2=已过期, 3=已禁用 + valid_days: Optional[int] = 365 + + class Config: + from_attributes = True + +class LicenseCreate(LicenseBase): + count: int = 1 + prefix: Optional[str] = "" + length: Optional[int] = 32 + +class LicenseUpdate(LicenseBase): + pass + +class LicenseInDB(LicenseBase): + license_id: int + license_key: str + create_time: datetime + update_time: datetime + expire_time: Optional[datetime] = None + +# ==================== 版本管理模型 ==================== +class VersionBase(BaseModel): + product_id: str + version_num: str + platform: Optional[str] = "" + description: Optional[str] = "" + update_log: Optional[str] = "" + download_url: Optional[str] = "" + file_hash: Optional[str] = "" + force_update: Optional[int] = 0 + download_status: Optional[int] = 1 # 0=下架, 1=上架 + min_license_version: Optional[str] = "" + publish_status: Optional[int] = 0 # 0=未发布, 1=已发布 + + class Config: + from_attributes = True + +class VersionCreate(VersionBase): + publish_now: Optional[bool] = False + +class VersionUpdate(VersionBase): + pass + +class VersionInDB(VersionBase): + version_id: int + create_time: datetime + update_time: datetime + +# ==================== 设备管理模型 ==================== +class DeviceBase(BaseModel): + product_id: str + machine_code: str + software_version: Optional[str] = "" + status: Optional[int] = 1 # 0=禁用, 1=正常, 2=黑名单 + + class Config: + from_attributes = True + +class DeviceCreate(DeviceBase): + license_key: Optional[str] = None + +class DeviceUpdate(DeviceBase): + pass + +class DeviceInDB(DeviceBase): + device_id: int + create_time: datetime + last_verify_time: Optional[datetime] = None + +# ==================== 产品管理模型 ==================== +class ProductBase(BaseModel): + product_name: str + description: Optional[str] = "" + status: Optional[int] = 1 # 0=禁用, 1=正常 + + class Config: + from_attributes = True + +class ProductCreate(ProductBase): + product_id: Optional[str] = None + +class ProductUpdate(ProductBase): + pass + +class ProductInDB(ProductBase): + product_id: str + create_time: datetime + update_time: datetime + +# ==================== 数据库模型 ==================== +# 用户管理表 +class DBAdmin(Base): + __tablename__ = "admin" + + admin_id = Column(Integer, primary_key=True) + username = Column(String(32), unique=True, nullable=False) + password_hash = Column(String(128), nullable=False) + email = Column(String(128), nullable=True) + role = Column(Integer, default=0) # 0=普通管理员, 1=超级管理员 + status = Column(Integer, default=1) # 0=禁用, 1=正常 + create_time = Column(DateTime, default=datetime.utcnow) + update_time = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + is_deleted = Column(Integer, default=0) # 软删除标志 + + def __init__(self, **kwargs): + super(DBAdmin, self).__init__(**kwargs) + +# 工单表 +class DBTicket(Base): + __tablename__ = "ticket" + + ticket_id = Column(Integer, primary_key=True) + title = Column(String(128), nullable=False) + product_id = Column(String(32), nullable=False) + software_version = Column(String(32), nullable=True) + machine_code = Column(String(64), nullable=True) + license_key = Column(String(64), nullable=True) + description = Column(Text, nullable=False) + priority = Column(Integer, default=1) # 1=低, 2=中, 3=高 + status = Column(Integer, default=0) # 0=待处理, 1=处理中, 2=已解决, 3=已关闭 + create_time = Column(DateTime, default=datetime.utcnow) + update_time = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + def __init__(self, **kwargs): + super(DBTicket, self).__init__(**kwargs) + +# 卡密表 +class DBLicense(Base): + __tablename__ = "license" + + license_id = Column(Integer, primary_key=True) + product_id = Column(String(32), nullable=False) + license_key = Column(String(64), unique=True, nullable=False) + type = Column(Integer, default=1) # 0=试用, 1=正式 + status = Column(Integer, default=0) # 0=未使用, 1=已使用, 2=已过期, 3=已禁用 + create_time = Column(DateTime, default=datetime.utcnow) + update_time = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + expire_time = Column(DateTime, nullable=True) + + def __init__(self, **kwargs): + super(DBLicense, self).__init__(**kwargs) + +# 版本表 +class DBVersion(Base): + __tablename__ = "version" + + version_id = Column(Integer, primary_key=True) + product_id = Column(String(32), nullable=False) + version_num = Column(String(32), nullable=False) + platform = Column(String(32), nullable=True) + description = Column(Text, nullable=True) + update_log = Column(Text, nullable=True) + download_url = Column(String(256), nullable=True) + file_hash = Column(String(64), nullable=True) + force_update = Column(Integer, default=0) + download_status = Column(Integer, default=1) # 0=下架, 1=上架 + min_license_version = Column(String(32), nullable=True) + publish_status = Column(Integer, default=0) # 0=未发布, 1=已发布 + create_time = Column(DateTime, default=datetime.utcnow) + update_time = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + def __init__(self, **kwargs): + super(DBVersion, self).__init__(**kwargs) + +# 设备表 +class DBDevice(Base): + __tablename__ = "device" + + device_id = Column(Integer, primary_key=True) + product_id = Column(String(32), nullable=False) + machine_code = Column(String(64), nullable=False) + software_version = Column(String(32), nullable=True) + status = Column(Integer, default=1) # 0=禁用, 1=正常, 2=黑名单 + create_time = Column(DateTime, default=datetime.utcnow) + last_verify_time = Column(DateTime, nullable=True) + + def __init__(self, **kwargs): + super(DBDevice, self).__init__(**kwargs) + +# 产品表 +class DBProduct(Base): + __tablename__ = "product" + + product_id = Column(String(32), primary_key=True) + product_name = Column(String(64), nullable=False) + description = Column(Text, nullable=True) + status = Column(Integer, default=1) # 0=禁用, 1=正常 + create_time = Column(DateTime, default=datetime.utcnow) + update_time = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + def __init__(self, **kwargs): + super(DBProduct, self).__init__(**kwargs) + +# ==================== 用户管理接口 ==================== +@app.get("/") +async def root(): + return {"message": "欢迎使用KaMiXiTong API测试平台", "version": "1.0.0"} + +@app.get("/admins", response_model=List[AdminInDB]) +async def get_admins( + skip: int = 0, + limit: int = 100, + keyword: Optional[str] = None, + role: Optional[int] = None, + status: Optional[int] = None, + db: Session = Depends(get_db) +): + """获取管理员列表""" + query = db.query(DBAdmin).filter(DBAdmin.is_deleted == 0) + + if keyword: + query = query.filter(DBAdmin.username.contains(keyword)) + + if role is not None: + query = query.filter(DBAdmin.role == role) + + if status is not None: + query = query.filter(DBAdmin.status == status) + + admins = query.offset(skip).limit(limit).all() + return admins + +@app.post("/admins", response_model=AdminInDB) +async def create_admin(admin: AdminCreate, db: Session = Depends(get_db)): + """创建管理员""" + # 检查用户名是否已存在 + existing = db.query(DBAdmin).filter( + DBAdmin.username == admin.username, + DBAdmin.is_deleted == 0 + ).first() + + if existing: + raise HTTPException(status_code=400, detail="用户名已存在") + + # 创建管理员(简化密码处理) + db_admin = DBAdmin( + username=admin.username, + email=admin.email, + role=admin.role, + status=admin.status, + password_hash=f"hashed_{admin.password}" # 简化处理 + ) + + db.add(db_admin) + db.commit() + db.refresh(db_admin) + return db_admin + +@app.get("/admins/{admin_id}", response_model=AdminInDB) +async def get_admin(admin_id: int, db: Session = Depends(get_db)): + """获取管理员详情""" + admin = db.query(DBAdmin).filter( + DBAdmin.admin_id == admin_id, + DBAdmin.is_deleted == 0 + ).first() + + if not admin: + raise HTTPException(status_code=404, detail="管理员不存在") + return admin + +@app.put("/admins/{admin_id}", response_model=AdminInDB) +async def update_admin(admin_id: int, admin: AdminUpdate, db: Session = Depends(get_db)): + """更新管理员""" + db_admin = db.query(DBAdmin).filter( + DBAdmin.admin_id == admin_id, + DBAdmin.is_deleted == 0 + ).first() + + if not db_admin: + raise HTTPException(status_code=404, detail="管理员不存在") + + # 更新字段 + if admin.username and admin.username != db_admin.username: + # 检查新用户名是否已存在 + existing = db.query(DBAdmin).filter( + DBAdmin.username == admin.username, + DBAdmin.admin_id != admin_id, + DBAdmin.is_deleted == 0 + ).first() + + if existing: + raise HTTPException(status_code=400, detail="用户名已存在") + db_admin.username = admin.username + + if admin.email is not None: + db_admin.email = admin.email + if admin.role is not None: + db_admin.role = admin.role + if admin.status is not None: + db_admin.status = admin.status + if admin.password: + db_admin.password_hash = f"hashed_{admin.password}" # 简化处理 + + db.commit() + db.refresh(db_admin) + return db_admin + +@app.delete("/admins/{admin_id}") +async def delete_admin(admin_id: int, db: Session = Depends(get_db)): + """删除管理员(软删除)""" + db_admin = db.query(DBAdmin).filter( + DBAdmin.admin_id == admin_id, + DBAdmin.is_deleted == 0 + ).first() + + if not db_admin: + raise HTTPException(status_code=404, detail="管理员不存在") + + db_admin.is_deleted = 1 + db.commit() + return {"message": "管理员删除成功"} + +@app.post("/admins/{admin_id}/toggle-status") +async def toggle_admin_status(admin_id: int, db: Session = Depends(get_db)): + """切换管理员状态""" + db_admin = db.query(DBAdmin).filter( + DBAdmin.admin_id == admin_id, + DBAdmin.is_deleted == 0 + ).first() + + if not db_admin: + raise HTTPException(status_code=404, detail="管理员不存在") + + db_admin.status = 0 if db_admin.status == 1 else 1 + db.commit() + + status_name = "正常" if db_admin.status == 1 else "禁用" + action = "启用" if db_admin.status == 1 else "禁用" + return {"message": f"管理员已{action}", "status": db_admin.status, "status_name": status_name} + +# ==================== 工单管理接口 ==================== +@app.get("/tickets", response_model=List[TicketInDB]) +async def get_tickets( + skip: int = 0, + limit: int = 100, + status: Optional[int] = None, + priority: Optional[int] = None, + product_id: Optional[str] = None, + db: Session = Depends(get_db) +): + """获取工单列表""" + query = db.query(DBTicket) + + if status is not None: + query = query.filter(DBTicket.status == status) + if priority is not None: + query = query.filter(DBTicket.priority == priority) + if product_id: + query = query.filter(DBTicket.product_id == product_id) + + query = query.order_by(DBTicket.create_time.desc()) + tickets = query.offset(skip).limit(limit).all() + return tickets + +@app.post("/tickets", response_model=TicketInDB) +async def create_ticket(ticket: TicketCreate, db: Session = Depends(get_db)): + """创建工单""" + # 验证产品存在(简化处理) + db_ticket = DBTicket(**ticket.model_dump()) + db.add(db_ticket) + db.commit() + db.refresh(db_ticket) + return db_ticket + +@app.put("/tickets/batch/status") +async def batch_update_ticket_status( + ticket_ids: List[int], + status: int, + remark: Optional[str] = None, + db: Session = Depends(get_db) +): + """批量更新工单状态""" + if status not in [0, 1, 2, 3]: + raise HTTPException(status_code=400, detail="无效的状态值") + + # 查找所有要更新的工单 + tickets = db.query(DBTicket).filter(DBTicket.ticket_id.in_(ticket_ids)).all() + if len(tickets) != len(ticket_ids): + found_ids = [t.ticket_id for t in tickets] + missing_ids = [tid for tid in ticket_ids if tid not in found_ids] + raise HTTPException(status_code=404, detail=f"以下工单不存在: {', '.join(map(str, missing_ids))}") + + # 批量更新工单状态 + for ticket in tickets: + ticket.status = status + ticket.update_time = datetime.utcnow() + + db.commit() + + status_names = {0: '待处理', 1: '处理中', 2: '已解决', 3: '已关闭'} + status_name = status_names.get(status, '未知') + return {"message": f"成功将 {len(tickets)} 个工单状态更新为{status_name}"} + +# ==================== 卡密管理接口 ==================== +@app.get("/licenses", response_model=List[LicenseInDB]) +async def get_licenses( + skip: int = 0, + limit: int = 100, + product_id: Optional[str] = None, + status: Optional[int] = None, + license_type: Optional[int] = None, + keyword: Optional[str] = None, + db: Session = Depends(get_db) +): + """获取卡密列表""" + query = db.query(DBLicense) + + if product_id: + query = query.filter(DBLicense.product_id == product_id) + if status is not None: + query = query.filter(DBLicense.status == status) + if license_type is not None: + query = query.filter(DBLicense.type == license_type) + if keyword: + query = query.filter(func.lower(DBLicense.license_key).like(f"%{keyword.lower()}%")) + + query = query.order_by(DBLicense.create_time.desc()) + licenses = query.offset(skip).limit(limit).all() + return licenses + +@app.post("/licenses", response_model=dict) +async def generate_licenses(license: LicenseCreate, db: Session = Depends(get_db)): + """批量生成卡密""" + # 验证参数 + if license.count < 1 or license.count > 10000: + raise HTTPException(status_code=400, detail="生成数量必须在1-10000之间") + + if license.length < 16 or license.length > 35: + raise HTTPException(status_code=400, detail="卡密长度必须在16-35之间") + + # 试用卡密最大有效期限制 + if license.type == 0 and license.valid_days and license.valid_days > 90: + raise HTTPException(status_code=400, detail="试用卡密有效期不能超过90天") + + # 生成卡密(简化处理) + import secrets + import string + + licenses = [] + characters = string.ascii_uppercase + string.digits + + for i in range(license.count): + # 生成卡密 + key = license.prefix + ''.join(secrets.choice(characters) for _ in range(license.length - len(license.prefix))) + + # 确保卡密唯一 + max_attempts = 10 + for attempt in range(max_attempts): + existing = db.query(DBLicense).filter(DBLicense.license_key == key).first() + if not existing: + break + key = license.prefix + ''.join(secrets.choice(characters) for _ in range(license.length - len(license.prefix))) + else: + raise HTTPException(status_code=500, detail="无法生成唯一的卡密,请稍后重试") + + # 计算过期时间 + expire_time = None + if license.valid_days: + expire_time = datetime.utcnow() + timedelta(days=license.valid_days) + + # 创建卡密对象 + db_license = DBLicense( + product_id=license.product_id, + license_key=key, + type=license.type, + status=0, # 未使用 + expire_time=expire_time + ) + licenses.append(db_license) + + # 批量保存 + db.add_all(licenses) + db.commit() + + # 格式化结果 + license_data = [] + for db_license in licenses: + db.refresh(db_license) + license_data.append(LicenseInDB.model_validate(db_license)) + + return { + "message": f"成功生成 {license.count} 个卡密", + "licenses": license_data, + "count": len(licenses) + } + +# ==================== 版本管理接口 ==================== +@app.get("/versions", response_model=List[VersionInDB]) +async def get_versions( + skip: int = 0, + limit: int = 100, + product_id: Optional[str] = None, + publish_status: Optional[int] = None, + db: Session = Depends(get_db) +): + """获取版本列表""" + query = db.query(DBVersion) + + if product_id: + query = query.filter(DBVersion.product_id == product_id) + if publish_status is not None: + query = query.filter(DBVersion.publish_status == publish_status) + + query = query.order_by(DBVersion.create_time.desc()) + versions = query.offset(skip).limit(limit).all() + return versions + +@app.post("/versions", response_model=VersionInDB) +async def create_version(version: VersionCreate, db: Session = Depends(get_db)): + """创建版本""" + # 验证产品存在(简化处理) + if not version.product_id or not version.version_num: + raise HTTPException(status_code=400, detail="缺少必要参数") + + # 检查版本号是否重复 + existing = db.query(DBVersion).filter( + DBVersion.product_id == version.product_id, + DBVersion.version_num == version.version_num + ).first() + + if existing: + raise HTTPException(status_code=400, detail="版本号已存在") + + # 创建版本 + db_version = DBVersion(**version.model_dump(exclude={'publish_now'})) + db.add(db_version) + db.commit() + db.refresh(db_version) + + # 如果选择了立即发布,则发布版本 + if version.publish_now: + db_version.publish_status = 1 + db.commit() + db.refresh(db_version) + + return db_version + +@app.post("/versions/{version_id}/publish") +async def publish_version(version_id: int, db: Session = Depends(get_db)): + """发布版本""" + version = db.query(DBVersion).filter(DBVersion.version_id == version_id).first() + if not version: + raise HTTPException(status_code=404, detail="版本不存在") + + version.publish_status = 1 + version.update_time = datetime.utcnow() + db.commit() + db.refresh(version) + + return {"message": "版本发布成功", "version": VersionInDB.model_validate(version)} + +# ==================== 设备管理接口 ==================== +@app.get("/devices", response_model=List[DeviceInDB]) +async def get_devices( + skip: int = 0, + limit: int = 100, + product_id: Optional[str] = None, + software_version: Optional[str] = None, + status: Optional[int] = None, + keyword: Optional[str] = None, + db: Session = Depends(get_db) +): + """获取设备列表""" + query = db.query(DBDevice) + + if product_id: + query = query.filter(DBDevice.product_id == product_id) + if software_version: + query = query.filter(DBDevice.software_version == software_version) + if status is not None: + query = query.filter(DBDevice.status == status) + if keyword: + query = query.filter(DBDevice.machine_code.contains(keyword)) + + query = query.order_by(DBDevice.last_verify_time.desc()) + devices = query.offset(skip).limit(limit).all() + return devices + +@app.put("/devices/{device_id}/status") +async def update_device_status(device_id: int, status: int, db: Session = Depends(get_db)): + """更新设备状态""" + if status not in [0, 1, 2]: + raise HTTPException(status_code=400, detail="无效的状态值") + + device = db.query(DBDevice).filter(DBDevice.device_id == device_id).first() + if not device: + raise HTTPException(status_code=404, detail="设备不存在") + + device.status = status + device.last_verify_time = datetime.utcnow() + db.commit() + db.refresh(device) + + return {"message": "设备状态更新成功", "device": DeviceInDB.model_validate(device)} + +@app.delete("/devices/{device_id}") +async def delete_device(device_id: int, db: Session = Depends(get_db)): + """删除设备""" + device = db.query(DBDevice).filter(DBDevice.device_id == device_id).first() + if not device: + raise HTTPException(status_code=404, detail="设备不存在") + + db.delete(device) + db.commit() + return {"message": "设备删除成功"} + +@app.delete("/devices/batch") +async def batch_delete_devices(device_ids: List[int], db: Session = Depends(get_db)): + """批量删除设备""" + # 查找所有要删除的设备 + devices = db.query(DBDevice).filter(DBDevice.device_id.in_(device_ids)).all() + if len(devices) != len(device_ids): + found_ids = [d.device_id for d in devices] + missing_ids = [did for did in device_ids if did not in found_ids] + raise HTTPException(status_code=404, detail=f"以下设备不存在: {', '.join(map(str, missing_ids))}") + + # 批量删除设备 + for device in devices: + db.delete(device) + + db.commit() + return {"message": f"成功删除 {len(devices)} 个设备"} + +# ==================== 产品管理接口 ==================== +@app.get("/products", response_model=List[ProductInDB]) +async def get_products( + skip: int = 0, + limit: int = 100, + keyword: Optional[str] = None, + db: Session = Depends(get_db) +): + """获取产品列表""" + query = db.query(DBProduct) + + if keyword: + query = query.filter( + DBProduct.product_name.like(f"%{keyword}%") | + DBProduct.description.like(f"%{keyword}%") + ) + + query = query.order_by(DBProduct.create_time.desc()) + products = query.offset(skip).limit(limit).all() + return products + +@app.post("/products", response_model=ProductInDB) +async def create_product(product: ProductCreate, db: Session = Depends(get_db)): + """创建产品""" + if not product.product_name.strip(): + raise HTTPException(status_code=400, detail="产品名称不能为空") + + # 检查自定义ID是否重复 + if product.product_id: + existing = db.query(DBProduct).filter(DBProduct.product_id == product.product_id).first() + if existing: + raise HTTPException(status_code=400, detail="产品ID已存在") + + # 创建产品 + import uuid + product_id = product.product_id if product.product_id else f"PROD_{uuid.uuid4().hex[:8]}".upper() + + db_product = DBProduct( + product_id=product_id, + product_name=product.product_name, + description=product.description, + status=product.status + ) + + db.add(db_product) + db.commit() + db.refresh(db_product) + return db_product + +@app.get("/products/{product_id}", response_model=ProductInDB) +async def get_product(product_id: str, db: Session = Depends(get_db)): + """获取产品详情""" + product = db.query(DBProduct).filter(DBProduct.product_id == product_id).first() + if not product: + raise HTTPException(status_code=404, detail="产品不存在") + return product + +@app.put("/products/{product_id}", response_model=ProductInDB) +async def update_product(product_id: str, product: ProductUpdate, db: Session = Depends(get_db)): + """更新产品""" + db_product = db.query(DBProduct).filter(DBProduct.product_id == product_id).first() + if not db_product: + raise HTTPException(status_code=404, detail="产品不存在") + + # 更新字段 + if product.product_name is not None: + db_product.product_name = product.product_name + if product.description is not None: + db_product.description = product.description + if product.status is not None: + db_product.status = product.status + + db_product.update_time = datetime.utcnow() + db.commit() + db.refresh(db_product) + return db_product + +@app.delete("/products/{product_id}") +async def delete_product(product_id: str, db: Session = Depends(get_db)): + """删除产品""" + product = db.query(DBProduct).filter(DBProduct.product_id == product_id).first() + if not product: + raise HTTPException(status_code=404, detail="产品不存在") + + db.delete(product) + db.commit() + return {"message": "产品删除成功"} + +if __name__ == "__main__": + import uvicorn + # 使用127.0.0.1而不是0.0.0.0来避免权限问题 + uvicorn.run(app, host="127.0.0.1", port=9003, log_level="info") \ No newline at end of file diff --git a/api_test_app_mysql.py b/api_test_app_mysql.py new file mode 100644 index 0000000..5768bfb --- /dev/null +++ b/api_test_app_mysql.py @@ -0,0 +1,1108 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +FastAPI接口测试应用 (MySQL版本) +提供所有管理功能的API接口测试页面,直接使用MySQL数据库 +""" + +import os +import sys +import pymysql +from datetime import datetime, timedelta +from typing import List, Optional +from pydantic import BaseModel + +from fastapi import FastAPI, HTTPException, Depends, status +from fastapi.middleware.cors import CORSMiddleware + +# 添加项目根目录到Python路径 +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +# 导入配置 +from config import Config + +# 解析数据库URL +def parse_database_url(url): + """解析数据库URL""" + # 格式: mysql+pymysql://user:password@host:port/database + try: + # 移除mysql+pymysql://前缀 + url = url.replace('mysql+pymysql://', '') + + # 找到@符号前的用户名密码 + at_index = url.index('@') + user_pass = url[:at_index] + url = url[at_index + 1:] + + # 分割用户名和密码 + user, password = user_pass.split(':', 1) + + # 找到数据库名 + if '/' in url: + host_port, database = url.split('/', 1) + # 提取端口 + if ':' in host_port: + host, port = host_port.split(':', 1) + else: + host = host_port + port = '3306' + else: + raise ValueError("URL格式不正确") + + return { + 'user': user, + 'password': password, + 'host': host, + 'port': port, + 'database': database + } + except Exception as e: + print(f"无法解析数据库URL: {e}") + return None + +# 获取数据库连接 +def get_db_connection(): + """获取MySQL数据库连接""" + db_config = parse_database_url(Config.SQLALCHEMY_DATABASE_URI) + if not db_config: + raise Exception("无法解析数据库配置") + + connection = pymysql.connect( + host=db_config['host'], + port=int(db_config['port']), + user=db_config['user'], + password=db_config['password'], + database=db_config['database'], + charset='utf8mb4', + cursorclass=pymysql.cursors.DictCursor + ) + return connection + +# 创建FastAPI应用 +app = FastAPI( + title="KaMiXiTong API测试平台 (MySQL版)", + description="软件授权管理系统的完整API接口测试平台,使用MySQL数据库", + version="1.0.0", + docs_url="/docs", + redoc_url="/redoc" +) + +# 添加CORS中间件 +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# ==================== 用户管理模型 ==================== +class AdminBase(BaseModel): + username: str + email: Optional[str] = None + role: Optional[int] = 0 # 0=普通管理员, 1=超级管理员 + status: Optional[int] = 1 # 0=禁用, 1=正常 + + class Config: + from_attributes = True + +class AdminCreate(AdminBase): + password: str + +class AdminUpdate(AdminBase): + password: Optional[str] = None + +class AdminInDB(AdminBase): + admin_id: int + create_time: datetime + update_time: datetime + +# ==================== 工单管理模型 ==================== +class TicketBase(BaseModel): + title: str + product_id: str + description: str + priority: Optional[int] = 1 # 1=低, 2=中, 3=高 + status: Optional[int] = 0 # 0=待处理, 1=处理中, 2=已解决, 3=已关闭 + + class Config: + from_attributes = True + +class TicketCreate(TicketBase): + software_version: Optional[str] = None + machine_code: Optional[str] = None + license_key: Optional[str] = None + +class TicketUpdate(TicketBase): + pass + +class TicketInDB(TicketBase): + ticket_id: int + create_time: datetime + update_time: datetime + +# ==================== 卡密管理模型 ==================== +class LicenseBase(BaseModel): + product_id: str + type: int = 1 # 0=试用, 1=正式 + status: Optional[int] = 0 # 0=未使用, 1=已使用, 2=已过期, 3=已禁用 + valid_days: Optional[int] = 365 + + class Config: + from_attributes = True + +class LicenseCreate(LicenseBase): + count: int = 1 + prefix: Optional[str] = "" + length: Optional[int] = 32 + +class LicenseUpdate(LicenseBase): + pass + +class LicenseInDB(LicenseBase): + license_id: int + license_key: str + create_time: datetime + update_time: datetime + expire_time: Optional[datetime] = None + +# ==================== 版本管理模型 ==================== +class VersionBase(BaseModel): + product_id: str + version_num: str + platform: Optional[str] = "" + description: Optional[str] = "" + update_log: Optional[str] = "" + download_url: Optional[str] = "" + file_hash: Optional[str] = "" + force_update: Optional[int] = 0 + download_status: Optional[int] = 1 # 0=下架, 1=上架 + min_license_version: Optional[str] = "" + publish_status: Optional[int] = 0 # 0=未发布, 1=已发布 + + class Config: + from_attributes = True + +class VersionCreate(VersionBase): + publish_now: Optional[bool] = False + +class VersionUpdate(VersionBase): + pass + +class VersionInDB(VersionBase): + version_id: int + create_time: datetime + update_time: datetime + +# ==================== 设备管理模型 ==================== +class DeviceBase(BaseModel): + product_id: str + machine_code: str + software_version: Optional[str] = "" + status: Optional[int] = 1 # 0=禁用, 1=正常, 2=黑名单 + + class Config: + from_attributes = True + +class DeviceCreate(DeviceBase): + license_key: Optional[str] = None + +class DeviceUpdate(DeviceBase): + pass + +class DeviceInDB(DeviceBase): + device_id: int + create_time: datetime + last_verify_time: Optional[datetime] = None + +# ==================== 产品管理模型 ==================== +class ProductBase(BaseModel): + product_name: str + description: Optional[str] = "" + status: Optional[int] = 1 # 0=禁用, 1=正常 + + class Config: + from_attributes = True + +class ProductCreate(ProductBase): + product_id: Optional[str] = None + +class ProductUpdate(ProductBase): + pass + +class ProductInDB(ProductBase): + product_id: str + create_time: datetime + update_time: datetime + +# ==================== 用户管理接口 ==================== +@app.get("/") +async def root(): + return {"message": "欢迎使用KaMiXiTong API测试平台 (MySQL版)", "version": "1.0.0"} + +@app.get("/admins", response_model=List[AdminInDB]) +async def get_admins( + skip: int = 0, + limit: int = 100, + keyword: Optional[str] = None, + role: Optional[int] = None, + status: Optional[int] = None +): + """获取管理员列表""" + try: + connection = get_db_connection() + with connection.cursor() as cursor: + # 构建查询 + sql = "SELECT * FROM admin WHERE is_deleted = 0" + params = [] + + if keyword: + sql += " AND username LIKE %s" + params.append(f"%{keyword}%") + + if role is not None: + sql += " AND role = %s" + params.append(role) + + if status is not None: + sql += " AND status = %s" + params.append(status) + + sql += " ORDER BY create_time DESC LIMIT %s OFFSET %s" + params.extend([limit, skip]) + + cursor.execute(sql, params) + admins = cursor.fetchall() + + connection.close() + return admins + except Exception as e: + raise HTTPException(status_code=500, detail=f"数据库查询失败: {str(e)}") + +@app.post("/admins", response_model=AdminInDB) +async def create_admin(admin: AdminCreate): + """创建管理员""" + try: + connection = get_db_connection() + with connection.cursor() as cursor: + # 检查用户名是否已存在 + cursor.execute( + "SELECT admin_id FROM admin WHERE username = %s AND is_deleted = 0", + (admin.username,) + ) + existing = cursor.fetchone() + + if existing: + raise HTTPException(status_code=400, detail="用户名已存在") + + # 创建管理员(简化密码处理) + sql = """ + INSERT INTO admin (username, email, password_hash, role, status, create_time, update_time) + VALUES (%s, %s, %s, %s, %s, %s, %s) + """ + params = ( + admin.username, + admin.email, + f"hashed_{admin.password}", # 简化处理 + admin.role, + admin.status, + datetime.utcnow(), + datetime.utcnow() + ) + + cursor.execute(sql, params) + admin_id = cursor.lastrowid + + connection.commit() + + # 查询创建的管理员 + cursor.execute("SELECT * FROM admin WHERE admin_id = %s", (admin_id,)) + created_admin = cursor.fetchone() + + connection.close() + return created_admin + except Exception as e: + raise HTTPException(status_code=500, detail=f"创建管理员失败: {str(e)}") + +@app.get("/admins/{admin_id}", response_model=AdminInDB) +async def get_admin(admin_id: int): + """获取管理员详情""" + try: + connection = get_db_connection() + with connection.cursor() as cursor: + cursor.execute( + "SELECT * FROM admin WHERE admin_id = %s AND is_deleted = 0", + (admin_id,) + ) + admin = cursor.fetchone() + + connection.close() + + if not admin: + raise HTTPException(status_code=404, detail="管理员不存在") + return admin + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"查询管理员失败: {str(e)}") + +@app.put("/admins/{admin_id}", response_model=AdminInDB) +async def update_admin(admin_id: int, admin: AdminUpdate): + """更新管理员""" + try: + connection = get_db_connection() + with connection.cursor() as cursor: + # 检查管理员是否存在 + cursor.execute( + "SELECT * FROM admin WHERE admin_id = %s AND is_deleted = 0", + (admin_id,) + ) + existing_admin = cursor.fetchone() + + if not existing_admin: + raise HTTPException(status_code=404, detail="管理员不存在") + + # 检查新用户名是否已存在 + if admin.username and admin.username != existing_admin['username']: + cursor.execute( + "SELECT admin_id FROM admin WHERE username = %s AND admin_id != %s AND is_deleted = 0", + (admin.username, admin_id) + ) + duplicate = cursor.fetchone() + + if duplicate: + raise HTTPException(status_code=400, detail="用户名已存在") + + # 更新字段 + updates = [] + params = [] + + if admin.username is not None: + updates.append("username = %s") + params.append(admin.username) + if admin.email is not None: + updates.append("email = %s") + params.append(admin.email) + if admin.role is not None: + updates.append("role = %s") + params.append(admin.role) + if admin.status is not None: + updates.append("status = %s") + params.append(admin.status) + if admin.password: + updates.append("password_hash = %s") + params.append(f"hashed_{admin.password}") # 简化处理 + if updates: + updates.append("update_time = %s") + params.append(datetime.utcnow()) + params.append(admin_id) + + sql = f"UPDATE admin SET {', '.join(updates)} WHERE admin_id = %s" + cursor.execute(sql, params) + connection.commit() + + # 查询更新后的管理员 + cursor.execute("SELECT * FROM admin WHERE admin_id = %s", (admin_id,)) + updated_admin = cursor.fetchone() + + connection.close() + return updated_admin + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"更新管理员失败: {str(e)}") + +@app.delete("/admins/{admin_id}") +async def delete_admin(admin_id: int): + """删除管理员(软删除)""" + try: + connection = get_db_connection() + with connection.cursor() as cursor: + # 检查管理员是否存在 + cursor.execute( + "SELECT admin_id FROM admin WHERE admin_id = %s AND is_deleted = 0", + (admin_id,) + ) + admin = cursor.fetchone() + + if not admin: + raise HTTPException(status_code=404, detail="管理员不存在") + + # 软删除 + cursor.execute( + "UPDATE admin SET is_deleted = 1, delete_time = %s WHERE admin_id = %s", + (datetime.utcnow(), admin_id) + ) + connection.commit() + + connection.close() + return {"message": "管理员删除成功"} + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"删除管理员失败: {str(e)}") + +@app.post("/admins/{admin_id}/toggle-status") +async def toggle_admin_status(admin_id: int): + """切换管理员状态""" + try: + connection = get_db_connection() + with connection.cursor() as cursor: + # 检查管理员是否存在 + cursor.execute( + "SELECT * FROM admin WHERE admin_id = %s AND is_deleted = 0", + (admin_id,) + ) + admin = cursor.fetchone() + + if not admin: + raise HTTPException(status_code=404, detail="管理员不存在") + + # 切换状态 + new_status = 0 if admin['status'] == 1 else 1 + cursor.execute( + "UPDATE admin SET status = %s, update_time = %s WHERE admin_id = %s", + (new_status, datetime.utcnow(), admin_id) + ) + connection.commit() + + # 查询更新后的管理员 + cursor.execute("SELECT * FROM admin WHERE admin_id = %s", (admin_id,)) + updated_admin = cursor.fetchone() + + connection.close() + + status_name = "正常" if updated_admin['status'] == 1 else "禁用" + action = "启用" if updated_admin['status'] == 1 else "禁用" + return {"message": f"管理员已{action}", "status": updated_admin['status'], "status_name": status_name} + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"切换管理员状态失败: {str(e)}") + +# ==================== 工单管理接口 ==================== +@app.get("/tickets", response_model=List[TicketInDB]) +async def get_tickets( + skip: int = 0, + limit: int = 100, + status: Optional[int] = None, + priority: Optional[int] = None, + product_id: Optional[str] = None +): + """获取工单列表""" + try: + connection = get_db_connection() + with connection.cursor() as cursor: + # 构建查询 + sql = "SELECT * FROM ticket" + params = [] + conditions = [] + + if status is not None: + conditions.append("status = %s") + params.append(status) + if priority is not None: + conditions.append("priority = %s") + params.append(priority) + if product_id: + conditions.append("product_id = %s") + params.append(product_id) + + if conditions: + sql += " WHERE " + " AND ".join(conditions) + + sql += " ORDER BY create_time DESC LIMIT %s OFFSET %s" + params.extend([limit, skip]) + + cursor.execute(sql, params) + tickets = cursor.fetchall() + + connection.close() + return tickets + except Exception as e: + raise HTTPException(status_code=500, detail=f"查询工单失败: {str(e)}") + +@app.post("/tickets", response_model=TicketInDB) +async def create_ticket(ticket: TicketCreate): + """创建工单""" + try: + connection = get_db_connection() + with connection.cursor() as cursor: + # 创建工单 + sql = """ + INSERT INTO ticket (title, product_id, software_version, machine_code, + license_key, description, priority, status, create_time, update_time) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + """ + params = ( + ticket.title, + ticket.product_id, + ticket.software_version, + ticket.machine_code, + ticket.license_key, + ticket.description, + ticket.priority, + ticket.status, + datetime.utcnow(), + datetime.utcnow() + ) + + cursor.execute(sql, params) + ticket_id = cursor.lastrowid + + connection.commit() + + # 查询创建的工单 + cursor.execute("SELECT * FROM ticket WHERE ticket_id = %s", (ticket_id,)) + created_ticket = cursor.fetchone() + + connection.close() + return created_ticket + except Exception as e: + raise HTTPException(status_code=500, detail=f"创建工单失败: {str(e)}") + +# ==================== 卡密管理接口 ==================== +@app.get("/licenses", response_model=List[LicenseInDB]) +async def get_licenses( + skip: int = 0, + limit: int = 100, + product_id: Optional[str] = None, + status: Optional[int] = None, + license_type: Optional[int] = None, + keyword: Optional[str] = None +): + """获取卡密列表""" + try: + connection = get_db_connection() + with connection.cursor() as cursor: + # 构建查询 + sql = "SELECT * FROM license" + params = [] + conditions = [] + + if product_id: + conditions.append("product_id = %s") + params.append(product_id) + if status is not None: + conditions.append("status = %s") + params.append(status) + if license_type is not None: + conditions.append("type = %s") + params.append(license_type) + if keyword: + conditions.append("license_key LIKE %s") + params.append(f"%{keyword}%") + + if conditions: + sql += " WHERE " + " AND ".join(conditions) + + sql += " ORDER BY create_time DESC LIMIT %s OFFSET %s" + params.extend([limit, skip]) + + cursor.execute(sql, params) + licenses = cursor.fetchall() + + connection.close() + return licenses + except Exception as e: + raise HTTPException(status_code=500, detail=f"查询卡密失败: {str(e)}") + +@app.post("/licenses", response_model=dict) +async def generate_licenses(license: LicenseCreate): + """批量生成卡密""" + try: + # 验证参数 + if license.count < 1 or license.count > 10000: + raise HTTPException(status_code=400, detail="生成数量必须在1-10000之间") + + if license.length < 16 or license.length > 35: + raise HTTPException(status_code=400, detail="卡密长度必须在16-35之间") + + # 试用卡密最大有效期限制 + if license.type == 0 and license.valid_days and license.valid_days > 90: + raise HTTPException(status_code=400, detail="试用卡密有效期不能超过90天") + + # 生成卡密 + import secrets + import string + + connection = get_db_connection() + with connection.cursor() as cursor: + licenses = [] + characters = string.ascii_uppercase + string.digits + + for i in range(license.count): + # 生成卡密 + key = license.prefix + ''.join(secrets.choice(characters) for _ in range(license.length - len(license.prefix))) + + # 确保卡密唯一 + max_attempts = 10 + for attempt in range(max_attempts): + cursor.execute("SELECT license_id FROM license WHERE license_key = %s", (key,)) + existing = cursor.fetchone() + if not existing: + break + key = license.prefix + ''.join(secrets.choice(characters) for _ in range(license.length - len(license.prefix))) + else: + raise HTTPException(status_code=500, detail="无法生成唯一的卡密,请稍后重试") + + # 计算过期时间 + expire_time = None + if license.valid_days: + expire_time = datetime.utcnow() + timedelta(days=license.valid_days) + + # 创建卡密 + sql = """ + INSERT INTO license (product_id, license_key, type, status, create_time, update_time, expire_time) + VALUES (%s, %s, %s, %s, %s, %s, %s) + """ + params = ( + license.product_id, + key, + license.type, + 0, # 未使用 + datetime.utcnow(), + datetime.utcnow(), + expire_time + ) + + cursor.execute(sql, params) + license_id = cursor.lastrowid + + # 查询创建的卡密 + cursor.execute("SELECT * FROM license WHERE license_id = %s", (license_id,)) + created_license = cursor.fetchone() + licenses.append(created_license) + + connection.commit() + + connection.close() + + return { + "message": f"成功生成 {license.count} 个卡密", + "licenses": licenses, + "count": len(licenses) + } + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"生成卡密失败: {str(e)}") + +# ==================== 版本管理接口 ==================== +@app.get("/versions", response_model=List[VersionInDB]) +async def get_versions( + skip: int = 0, + limit: int = 100, + product_id: Optional[str] = None, + publish_status: Optional[int] = None +): + """获取版本列表""" + try: + connection = get_db_connection() + with connection.cursor() as cursor: + # 构建查询 + sql = "SELECT * FROM version" + params = [] + conditions = [] + + if product_id: + conditions.append("product_id = %s") + params.append(product_id) + if publish_status is not None: + conditions.append("publish_status = %s") + params.append(publish_status) + + if conditions: + sql += " WHERE " + " AND ".join(conditions) + + sql += " ORDER BY create_time DESC LIMIT %s OFFSET %s" + params.extend([limit, skip]) + + cursor.execute(sql, params) + versions = cursor.fetchall() + + connection.close() + return versions + except Exception as e: + raise HTTPException(status_code=500, detail=f"查询版本失败: {str(e)}") + +@app.post("/versions", response_model=VersionInDB) +async def create_version(version: VersionCreate): + """创建版本""" + try: + # 验证参数 + if not version.product_id or not version.version_num: + raise HTTPException(status_code=400, detail="缺少必要参数") + + connection = get_db_connection() + with connection.cursor() as cursor: + # 检查版本号是否重复 + cursor.execute( + "SELECT version_id FROM version WHERE product_id = %s AND version_num = %s", + (version.product_id, version.version_num) + ) + existing = cursor.fetchone() + + if existing: + raise HTTPException(status_code=400, detail="版本号已存在") + + # 创建版本 + sql = """ + INSERT INTO version (product_id, version_num, platform, description, update_log, + download_url, file_hash, force_update, download_status, + min_license_version, publish_status, create_time, update_time) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + """ + params = ( + version.product_id, + version.version_num, + version.platform, + version.description, + version.update_log, + version.download_url, + version.file_hash, + version.force_update, + version.download_status, + version.min_license_version, + version.publish_status, + datetime.utcnow(), + datetime.utcnow() + ) + + cursor.execute(sql, params) + version_id = cursor.lastrowid + + connection.commit() + + # 如果选择了立即发布,则发布版本 + if version.publish_now: + cursor.execute( + "UPDATE version SET publish_status = 1, update_time = %s WHERE version_id = %s", + (datetime.utcnow(), version_id) + ) + connection.commit() + + # 查询创建的版本 + cursor.execute("SELECT * FROM version WHERE version_id = %s", (version_id,)) + created_version = cursor.fetchone() + + connection.close() + return created_version + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"创建版本失败: {str(e)}") + +@app.post("/versions/{version_id}/publish") +async def publish_version(version_id: int): + """发布版本""" + try: + connection = get_db_connection() + with connection.cursor() as cursor: + # 检查版本是否存在 + cursor.execute("SELECT * FROM version WHERE version_id = %s", (version_id,)) + version = cursor.fetchone() + + if not version: + raise HTTPException(status_code=404, detail="版本不存在") + + # 发布版本 + cursor.execute( + "UPDATE version SET publish_status = 1, update_time = %s WHERE version_id = %s", + (datetime.utcnow(), version_id) + ) + connection.commit() + + # 查询更新后的版本 + cursor.execute("SELECT * FROM version WHERE version_id = %s", (version_id,)) + updated_version = cursor.fetchone() + + connection.close() + return {"message": "版本发布成功", "version": updated_version} + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"发布版本失败: {str(e)}") + +# ==================== 设备管理接口 ==================== +@app.get("/devices", response_model=List[DeviceInDB]) +async def get_devices( + skip: int = 0, + limit: int = 100, + product_id: Optional[str] = None, + software_version: Optional[str] = None, + status: Optional[int] = None, + keyword: Optional[str] = None +): + """获取设备列表""" + try: + connection = get_db_connection() + with connection.cursor() as cursor: + # 构建查询 + sql = "SELECT * FROM device" + params = [] + conditions = [] + + if product_id: + conditions.append("product_id = %s") + params.append(product_id) + if software_version: + conditions.append("software_version = %s") + params.append(software_version) + if status is not None: + conditions.append("status = %s") + params.append(status) + if keyword: + conditions.append("machine_code LIKE %s") + params.append(f"%{keyword}%") + + if conditions: + sql += " WHERE " + " AND ".join(conditions) + + sql += " ORDER BY last_verify_time DESC LIMIT %s OFFSET %s" + params.extend([limit, skip]) + + cursor.execute(sql, params) + devices = cursor.fetchall() + + connection.close() + return devices + except Exception as e: + raise HTTPException(status_code=500, detail=f"查询设备失败: {str(e)}") + +@app.put("/devices/{device_id}/status") +async def update_device_status(device_id: int, status: int): + """更新设备状态""" + if status not in [0, 1, 2]: + raise HTTPException(status_code=400, detail="无效的状态值") + + try: + connection = get_db_connection() + with connection.cursor() as cursor: + # 检查设备是否存在 + cursor.execute("SELECT * FROM device WHERE device_id = %s", (device_id,)) + device = cursor.fetchone() + + if not device: + raise HTTPException(status_code=404, detail="设备不存在") + + # 更新设备状态 + cursor.execute( + "UPDATE device SET status = %s, last_verify_time = %s WHERE device_id = %s", + (status, datetime.utcnow(), device_id) + ) + connection.commit() + + # 查询更新后的设备 + cursor.execute("SELECT * FROM device WHERE device_id = %s", (device_id,)) + updated_device = cursor.fetchone() + + connection.close() + return {"message": "设备状态更新成功", "device": updated_device} + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"更新设备状态失败: {str(e)}") + +@app.delete("/devices/{device_id}") +async def delete_device(device_id: int): + """删除设备""" + try: + connection = get_db_connection() + with connection.cursor() as cursor: + # 检查设备是否存在 + cursor.execute("SELECT device_id FROM device WHERE device_id = %s", (device_id,)) + device = cursor.fetchone() + + if not device: + raise HTTPException(status_code=404, detail="设备不存在") + + # 删除设备 + cursor.execute("DELETE FROM device WHERE device_id = %s", (device_id,)) + connection.commit() + + connection.close() + return {"message": "设备删除成功"} + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"删除设备失败: {str(e)}") + +# ==================== 产品管理接口 ==================== +@app.get("/products", response_model=List[ProductInDB]) +async def get_products( + skip: int = 0, + limit: int = 100, + keyword: Optional[str] = None +): + """获取产品列表""" + try: + connection = get_db_connection() + with connection.cursor() as cursor: + # 构建查询 + sql = "SELECT * FROM product" + params = [] + conditions = [] + + if keyword: + conditions.append("(product_name LIKE %s OR description LIKE %s)") + params.extend([f"%{keyword}%", f"%{keyword}%"]) + + if conditions: + sql += " WHERE " + " AND ".join(conditions) + + sql += " ORDER BY create_time DESC LIMIT %s OFFSET %s" + params.extend([limit, skip]) + + cursor.execute(sql, params) + products = cursor.fetchall() + + connection.close() + return products + except Exception as e: + raise HTTPException(status_code=500, detail=f"查询产品失败: {str(e)}") + +@app.post("/products", response_model=ProductInDB) +async def create_product(product: ProductCreate): + """创建产品""" + if not product.product_name.strip(): + raise HTTPException(status_code=400, detail="产品名称不能为空") + + try: + connection = get_db_connection() + with connection.cursor() as cursor: + # 检查自定义ID是否重复 + if product.product_id: + cursor.execute( + "SELECT product_id FROM product WHERE product_id = %s", + (product.product_id,) + ) + existing = cursor.fetchone() + if existing: + raise HTTPException(status_code=400, detail="产品ID已存在") + + # 创建产品 + import uuid + product_id = product.product_id if product.product_id else f"PROD_{uuid.uuid4().hex[:8]}".upper() + + sql = """ + INSERT INTO product (product_id, product_name, description, status, create_time, update_time) + VALUES (%s, %s, %s, %s, %s, %s) + """ + params = ( + product_id, + product.product_name, + product.description, + product.status, + datetime.utcnow(), + datetime.utcnow() + ) + + cursor.execute(sql, params) + connection.commit() + + # 查询创建的产品 + cursor.execute("SELECT * FROM product WHERE product_id = %s", (product_id,)) + created_product = cursor.fetchone() + + connection.close() + return created_product + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"创建产品失败: {str(e)}") + +@app.get("/products/{product_id}", response_model=ProductInDB) +async def get_product(product_id: str): + """获取产品详情""" + try: + connection = get_db_connection() + with connection.cursor() as cursor: + cursor.execute("SELECT * FROM product WHERE product_id = %s", (product_id,)) + product = cursor.fetchone() + + connection.close() + + if not product: + raise HTTPException(status_code=404, detail="产品不存在") + return product + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"查询产品失败: {str(e)}") + +@app.put("/products/{product_id}", response_model=ProductInDB) +async def update_product(product_id: str, product: ProductUpdate): + """更新产品""" + try: + connection = get_db_connection() + with connection.cursor() as cursor: + # 检查产品是否存在 + cursor.execute("SELECT * FROM product WHERE product_id = %s", (product_id,)) + existing_product = cursor.fetchone() + + if not existing_product: + raise HTTPException(status_code=404, detail="产品不存在") + + # 更新字段 + updates = [] + params = [] + + if product.product_name is not None: + updates.append("product_name = %s") + params.append(product.product_name) + if product.description is not None: + updates.append("description = %s") + params.append(product.description) + if product.status is not None: + updates.append("status = %s") + params.append(product.status) + + if updates: + updates.append("update_time = %s") + params.append(datetime.utcnow()) + params.append(product_id) + + sql = f"UPDATE product SET {', '.join(updates)} WHERE product_id = %s" + cursor.execute(sql, params) + connection.commit() + + # 查询更新后的产品 + cursor.execute("SELECT * FROM product WHERE product_id = %s", (product_id,)) + updated_product = cursor.fetchone() + + connection.close() + return updated_product + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"更新产品失败: {str(e)}") + +@app.delete("/products/{product_id}") +async def delete_product(product_id: str): + """删除产品""" + try: + connection = get_db_connection() + with connection.cursor() as cursor: + # 检查产品是否存在 + cursor.execute("SELECT product_id FROM product WHERE product_id = %s", (product_id,)) + product = cursor.fetchone() + + if not product: + raise HTTPException(status_code=404, detail="产品不存在") + + # 删除产品 + cursor.execute("DELETE FROM product WHERE product_id = %s", (product_id,)) + connection.commit() + + connection.close() + return {"message": "产品删除成功"} + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"删除产品失败: {str(e)}") + +if __name__ == "__main__": + import uvicorn + # 使用127.0.0.1而不是0.0.0.0来避免权限问题 + uvicorn.run(app, host="127.0.0.1", port=9004, log_level="info") \ No newline at end of file diff --git a/app/api/decorators.py b/app/api/decorators.py new file mode 100644 index 0000000..7bba07c --- /dev/null +++ b/app/api/decorators.py @@ -0,0 +1,64 @@ +from flask import jsonify +from flask_login import current_user +import functools + + +def require_login(f): + """登录用户权限验证装饰器(普通管理员和超级管理员都可以访问) + + 注意:对于API端点,不使用@login_required装饰器,因为它会重定向到登录页面 + 而不是返回JSON错误响应。我们直接检查认证状态并返回JSON。 + """ + @functools.wraps(f) + def decorated_function(*args, **kwargs): + # 检查用户是否已认证 + # Flask-Login 的 current_user 在未登录时是一个 AnonymousUserMixin 实例 + # 它的 is_authenticated 属性为 False + if not hasattr(current_user, 'is_authenticated') or not current_user.is_authenticated: + return jsonify({ + 'success': False, + 'message': '需要登录' + }), 401 + + # 检查账号是否激活(is_active 是 Admin 模型的属性) + if hasattr(current_user, 'is_active') and not current_user.is_active: + return jsonify({ + 'success': False, + 'message': '账号已被禁用' + }), 403 + + return f(*args, **kwargs) + return decorated_function + + +def require_admin(f): + """超级管理员权限验证装饰器(只有超级管理员可以访问) + + 注意:对于API端点,不使用@login_required装饰器,因为它会重定向到登录页面 + 而不是返回JSON错误响应。我们直接检查认证状态并返回JSON。 + """ + @functools.wraps(f) + def decorated_function(*args, **kwargs): + # 检查用户是否已认证 + if not hasattr(current_user, 'is_authenticated') or not current_user.is_authenticated: + return jsonify({ + 'success': False, + 'message': '需要登录' + }), 401 + + # 检查账号是否激活 + if hasattr(current_user, 'is_active') and not current_user.is_active: + return jsonify({ + 'success': False, + 'message': '账号已被禁用' + }), 403 + + # 检查是否为超级管理员 + if not hasattr(current_user, 'is_super_admin') or not current_user.is_super_admin(): + return jsonify({ + 'success': False, + 'message': '需要超级管理员权限' + }), 403 + + return f(*args, **kwargs) + return decorated_function \ No newline at end of file diff --git a/app/api/log.py b/app/api/log.py new file mode 100644 index 0000000..f3bb2d2 --- /dev/null +++ b/app/api/log.py @@ -0,0 +1,109 @@ +from flask import request, jsonify, current_app +from app import db +from app.models import AuditLog +from . import api_bp +from .decorators import require_admin +from datetime import datetime, timedelta +import os + +@api_bp.route('/logs', methods=['GET']) +@require_admin +def get_logs(): + """获取操作日志列表""" + try: + page = request.args.get('page', 1, type=int) + per_page = min(request.args.get('per_page', 20, type=int), 100) + action = request.args.get('action') + target_type = request.args.get('target_type') + admin_id = request.args.get('admin_id', type=int) + start_date = request.args.get('start_date') + end_date = request.args.get('end_date') + + query = AuditLog.query + + # 添加筛选条件 + if action: + query = query.filter(AuditLog.action == action) + if target_type: + query = query.filter(AuditLog.target_type == target_type) + if admin_id: + query = query.filter(AuditLog.admin_id == admin_id) + if start_date: + start_datetime = datetime.strptime(start_date, '%Y-%m-%d') + query = query.filter(AuditLog.create_time >= start_datetime) + if end_date: + end_datetime = datetime.strptime(end_date, '%Y-%m-%d') + timedelta(days=1) + query = query.filter(AuditLog.create_time < end_datetime) + + # 按时间倒序排列 + query = query.order_by(AuditLog.create_time.desc()) + + # 分页 + pagination = query.paginate(page=page, per_page=per_page, error_out=False) + + logs = [log.to_dict() for log in pagination.items] + + return jsonify({ + 'success': True, + 'data': { + 'logs': logs, + 'pagination': { + 'page': page, + 'per_page': per_page, + 'total': pagination.total, + 'pages': pagination.pages, + 'has_prev': pagination.has_prev, + 'has_next': pagination.has_next + } + } + }) + except Exception as e: + current_app.logger.error(f"获取操作日志列表失败: {str(e)}") + return jsonify({'success': False, 'message': '服务器内部错误'}), 500 + +@api_bp.route('/logs/actions', methods=['GET']) +@require_admin +def get_log_actions(): + """获取所有操作类型""" + try: + actions = db.session.query(AuditLog.action).distinct().all() + action_list = [action[0] for action in actions] + return jsonify({ + 'success': True, + 'data': { + 'actions': action_list + } + }) + except Exception as e: + current_app.logger.error(f"获取操作类型列表失败: {str(e)}") + return jsonify({'success': False, 'message': '服务器内部错误'}), 500 + +@api_bp.route('/logs/file', methods=['GET']) +@require_admin +def get_log_file(): + """获取系统日志文件内容""" + try: + log_file_path = 'logs/kamaxitong.log' + + # 检查日志文件是否存在 + if not os.path.exists(log_file_path): + return jsonify({ + 'success': False, + 'message': '日志文件不存在' + }), 404 + + # 读取日志文件内容 + with open(log_file_path, 'r', encoding='utf-8') as f: + # 读取最后1000行日志 + lines = f.readlines()[-1000:] + log_content = ''.join(lines) + + return jsonify({ + 'success': True, + 'data': { + 'content': log_content + } + }) + except Exception as e: + current_app.logger.error(f"读取日志文件失败: {str(e)}") + return jsonify({'success': False, 'message': '服务器内部错误'}), 500 \ No newline at end of file diff --git a/app/utils/logger.py b/app/utils/logger.py new file mode 100644 index 0000000..8a444fc --- /dev/null +++ b/app/utils/logger.py @@ -0,0 +1,51 @@ +from flask import request, current_app +from flask_login import current_user +from app.models import AuditLog +from functools import wraps +import json + +def log_operation(action, target_type, target_id=None, details=None): + """记录操作日志的工具函数""" + try: + # 获取当前用户信息 + admin_id = getattr(current_user, 'admin_id', None) if hasattr(current_user, 'is_authenticated') and current_user.is_authenticated else None + + # 获取客户端IP + ip_address = request.headers.get('X-Forwarded-For', request.remote_addr) + + # 获取用户代理 + user_agent = request.headers.get('User-Agent', '') + + # 记录审计日志 + AuditLog.log_action( + admin_id=admin_id, + action=action, + target_type=target_type, + target_id=target_id, + details=details, + ip_address=ip_address, + user_agent=user_agent + ) + except Exception as e: + if hasattr(current_app, 'logger'): + current_app.logger.error(f"记录操作日志失败: {str(e)}") + +def log_operations(action, target_type): + """记录操作日志的装饰器""" + def decorator(f): + @wraps(f) + def decorated_function(*args, **kwargs): + try: + # 执行原函数 + result = f(*args, **kwargs) + + # 记录成功日志 + log_operation(action, target_type) + + return result + except Exception as e: + # 记录错误日志 + log_operation(f"{action}_ERROR", target_type, details={'error': str(e)}) + raise e + return decorated_function + return decorator \ No newline at end of file diff --git a/app/web/templates/log/list.html b/app/web/templates/log/list.html new file mode 100644 index 0000000..3caa03f --- /dev/null +++ b/app/web/templates/log/list.html @@ -0,0 +1,283 @@ +{% extends "base.html" %} + +{% block title %}操作日志{% endblock %} + +{% block content %} +
+
+
+
+
+

操作日志

+
+
+ +
+
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+
+
+ + +
+
+
+ + +
+ + + + + + + + + + + + + + + + +
ID操作员操作类型目标类型目标ID详情IP地址时间
+
+ + + +
+
+
+
+
+ + + +{% endblock %} + +{% block scripts %} + +{% endblock %} \ No newline at end of file diff --git a/check_log_db.py b/check_log_db.py new file mode 100644 index 0000000..5507bd8 --- /dev/null +++ b/check_log_db.py @@ -0,0 +1,74 @@ +import sqlite3 +import os + +def check_audit_logs(): + """检查审计日志表""" + try: + # 连接到数据库 + if os.path.exists('instance/kamaxitong.db'): + conn = sqlite3.connect('instance/kamaxitong.db') + cursor = conn.cursor() + + # 查询审计日志表 + print("=== 查询审计日志表 ===") + cursor.execute("SELECT * FROM audit_log ORDER BY create_time DESC LIMIT 10") + rows = cursor.fetchall() + + if rows: + print(f"找到 {len(rows)} 条审计日志记录:") + # 获取列名 + column_names = [description[0] for description in cursor.description] + print("列名:", column_names) + + for row in rows: + print(row) + else: + print("审计日志表为空") + + conn.close() + else: + print("数据库文件不存在") + + except Exception as e: + print(f"检查审计日志时出现错误: {e}") + +def check_log_file(): + """检查日志文件""" + try: + print("\n=== 检查日志文件 ===") + if os.path.exists('logs/kamaxitong.log'): + # 以二进制模式读取文件 + with open('logs/kamaxitong.log', 'rb') as f: + content = f.read() + print(f"日志文件大小: {len(content)} 字节") + + # 尝试以不同编码读取 + try: + text_content = content.decode('utf-8') + lines = text_content.split('\n') + print(f"日志文件行数: {len(lines)}") + print("最后10行:") + for line in lines[-10:]: + if line.strip(): + print(line.strip()) + except UnicodeDecodeError: + # 尝试其他编码 + try: + text_content = content.decode('gbk') + lines = text_content.split('\n') + print(f"日志文件行数: {len(lines)} (GBK编码)") + print("最后10行:") + for line in lines[-10:]: + if line.strip(): + print(line.strip()) + except UnicodeDecodeError: + print("无法解码日志文件内容") + else: + print("日志文件不存在") + except Exception as e: + print(f"检查日志文件时出现错误: {e}") + +if __name__ == "__main__": + print("检查日志系统...") + check_audit_logs() + check_log_file() \ No newline at end of file diff --git a/check_products.py b/check_products.py new file mode 100644 index 0000000..3a67534 --- /dev/null +++ b/check_products.py @@ -0,0 +1,15 @@ +import os +os.environ.setdefault('FLASK_CONFIG', 'development') + +from app import create_app, db +from app.models import Product + +app = create_app() + +with app.app_context(): + products = Product.query.all() + print('Total products:', len(products)) + print('Product names:', [p.product_name for p in products]) + print('Product details:') + for p in products: + print(f' ID: {p.product_id}, Name: {p.product_name}, Status: {p.status}') \ No newline at end of file diff --git a/docs/FASTAPI.md b/docs/FASTAPI.md new file mode 100644 index 0000000..bdfed99 --- /dev/null +++ b/docs/FASTAPI.md @@ -0,0 +1,293 @@ +# FastAPI接口配置文档 + +本文档介绍了如何使用和配置KaMiXiTong系统的FastAPI接口,该接口提供了现代化的API和自动生成的交互式文档。 + +## 目录 + +1. [简介](#简介) +2. [安装依赖](#安装依赖) +3. [启动FastAPI服务](#启动fastapi服务) +4. [API接口说明](#api接口说明) + - [API管理接口](#api管理接口) + - [API密钥接口](#api密钥接口) + - [API版本接口](#api版本接口) +5. [访问API文档](#访问api文档) +6. [调试示例](#调试示例) + +## 简介 + +FastAPI接口是KaMiXiTong系统的一个补充接口,提供了以下优势: + +- 自动生成交互式API文档 +- 更快的接口响应速度 +- 更好的数据验证和错误处理 +- 支持异步操作 +- 与现有Flask应用并行运行 + +## 安装依赖 + +在运行FastAPI接口之前,需要安装额外的依赖包: + +```bash +pip install fastapi uvicorn python-multipart +``` + +或者使用requirements-fastapi.txt文件: + +```bash +pip install -r requirements-fastapi.txt +``` + +## 启动FastAPI服务 + +使用以下命令启动FastAPI服务: + +```bash +python fastapi_app.py +``` + +默认情况下,服务将在以下地址运行: +- 地址: http://localhost:8000 +- API文档: http://localhost:8000/docs +- ReDoc文档: http://localhost:8000/redoc + +如果端口被占用,可以修改端口号: + +```bash +python fastapi_app.py --port 9000 +``` + +您也可以使用uvicorn直接启动: + +```bash +uvicorn fastapi_app:app --host 127.0.0.1 --port 9000 --reload +``` + +## API接口说明 + +### API管理接口 + +#### 获取API列表 +- **URL**: `GET /apis` +- **参数**: + - `skip`: 跳过的记录数,默认为0 + - `limit`: 返回记录数,默认为100 +- **响应**: API对象列表 + +#### 创建API +- **URL**: `POST /apis` +- **请求体**: + ```json + { + "api_name": "示例API", + "description": "这是一个示例API", + "status": 1 + } + ``` +- **响应**: 创建的API对象 + +#### 获取API详情 +- **URL**: `GET /apis/{api_id}` +- **参数**: `api_id` - API的唯一标识符 +- **响应**: API对象 + +#### 更新API +- **URL**: `PUT /apis/{api_id}` +- **参数**: `api_id` - API的唯一标识符 +- **请求体**: + ```json + { + "api_name": "更新后的API名称", + "description": "更新后的描述", + "status": 1 + } + ``` +- **响应**: 更新后的API对象 + +#### 删除API +- **URL**: `DELETE /apis/{api_id}` +- **参数**: `api_id` - API的唯一标识符 +- **响应**: 删除结果 + +### API密钥接口 + +#### 获取API密钥列表 +- **URL**: `GET /api_keys` +- **参数**: + - `skip`: 跳过的记录数,默认为0 + - `limit`: 返回记录数,默认为100 +- **响应**: API密钥对象列表 + +#### 生成API密钥 +- **URL**: `POST /api_keys` +- **请求体**: + ```json + { + "name": "示例密钥", + "api_id": "API_12345678", + "description": "这是一个示例密钥", + "status": 1, + "expire_time": "2025-12-31T23:59:59" + } + ``` +- **响应**: 创建的API密钥对象 + +#### 获取API密钥详情 +- **URL**: `GET /api_keys/{key_id}` +- **参数**: `key_id` - API密钥的唯一标识符 +- **响应**: API密钥对象 + +#### 更新API密钥 +- **URL**: `PUT /api_keys/{key_id}` +- **参数**: `key_id` - API密钥的唯一标识符 +- **请求体**: + ```json + { + "name": "更新后的密钥名称", + "api_id": "API_87654321", + "description": "更新后的描述", + "status": 1, + "expire_time": "2026-12-31T23:59:59" + } + ``` +- **响应**: 更新后的API密钥对象 + +#### 删除API密钥 +- **URL**: `DELETE /api_keys/{key_id}` +- **参数**: `key_id` - API密钥的唯一标识符 +- **响应**: 删除结果 + +### API版本接口 + +#### 获取API版本列表 +- **URL**: `GET /api_versions` +- **参数**: + - `skip`: 跳过的记录数,默认为0 + - `limit`: 返回记录数,默认为100 +- **响应**: API版本对象列表 + +#### 创建API版本 +- **URL**: `POST /api_versions` +- **请求体**: + ```json + { + "version_num": "v1.0.0", + "api_id": "API_12345678", + "description": "初始版本", + "publish_status": 1 + } + ``` +- **响应**: 创建的API版本对象 + +#### 获取API版本详情 +- **URL**: `GET /api_versions/{version_id}` +- **参数**: `version_id` - API版本的唯一标识符 +- **响应**: API版本对象 + +#### 更新API版本 +- **URL**: `PUT /api_versions/{version_id}` +- **参数**: `version_id` - API版本的唯一标识符 +- **请求体**: + ```json + { + "version_num": "v1.0.1", + "api_id": "API_12345678", + "description": "修复了一些问题", + "publish_status": 1 + } + ``` +- **响应**: 更新后的API版本对象 + +#### 删除API版本 +- **URL**: `DELETE /api_versions/{version_id}` +- **参数**: `version_id` - API版本的唯一标识符 +- **响应**: 删除结果 + +## 访问API文档 + +FastAPI提供了自动生成的交互式API文档: + +1. **Swagger UI**: 访问 `http://localhost:9000/docs` + - 提供交互式的API测试界面 + - 可以直接在浏览器中测试API接口 + - 显示详细的接口参数和响应格式 + +2. **ReDoc**: 访问 `http://localhost:9000/redoc` + - 提供更简洁的API文档视图 + - 适合阅读和分享 + +## 调试示例 + +### 使用curl测试API + +```bash +# 获取API列表 +curl -X GET "http://localhost:9000/apis" -H "accept: application/json" + +# 创建API +curl -X POST "http://localhost:9000/apis" \ + -H "accept: application/json" \ + -H "Content-Type: application/json" \ + -d '{"api_name":"测试API","description":"用于测试的API","status":1}' + +# 获取特定API +curl -X GET "http://localhost:9000/apis/API_12345678" -H "accept: application/json" +``` + +在Windows PowerShell中,可以使用以下命令: + +```powershell +# 获取根路径信息 +powershell -Command "Invoke-WebRequest -Uri 'http://127.0.0.1:9000/' -Method GET" + +# 获取API列表 +powershell -Command "Invoke-WebRequest -Uri 'http://127.0.0.1:9000/apis' -Method GET" +``` + +### 使用Python requests测试API + +```python +import requests + +# 基础URL +BASE_URL = "http://localhost:9000" + +# 获取根路径信息 +response = requests.get(f"{BASE_URL}/") +print("根路径信息:", response.json()) + +# 获取API列表 +response = requests.get(f"{BASE_URL}/apis") +print("获取API列表:", response.json()) + +# 创建API +api_data = { + "api_name": "测试API", + "description": "用于测试的API", + "status": 1 +} +response = requests.post(f"{BASE_URL}/apis", json=api_data) +print("创建API:", response.json()) + +# 获取特定API +api_id = response.json()["api_id"] +response = requests.get(f"{BASE_URL}/apis/{api_id}") +print("获取API详情:", response.json()) +``` + +### 在Swagger UI中测试 + +1. 打开浏览器访问 `http://localhost:9000/docs` +2. 展开相应的API接口 +3. 点击"Try it out"按钮 +4. 输入必要的参数 +5. 点击"Execute"按钮执行请求 +6. 查看响应结果 + +## 注意事项 + +1. FastAPI接口与Flask应用共享同一个数据库,因此数据是一致的 +2. 两个应用可以同时运行,分别监听不同的端口 +3. 建议在生产环境中使用反向代理(如Nginx)统一对外提供服务 +4. 可以根据需要调整端口号和绑定地址 +5. 如果遇到端口占用问题,可以修改启动命令中的端口号 \ No newline at end of file diff --git a/fastapi_app.py b/fastapi_app.py new file mode 100644 index 0000000..933abc8 --- /dev/null +++ b/fastapi_app.py @@ -0,0 +1,392 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +FastAPI接口应用 +提供现代化的API接口和自动生成的文档 +""" + +import os +import sys +from datetime import datetime, timedelta +from typing import List, Optional + +from fastapi import FastAPI, HTTPException, Depends, status +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel +from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, Boolean, ForeignKey +from sqlalchemy.orm import declarative_base, sessionmaker, Session + +# 添加项目根目录到Python路径 +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +# 导入配置 +from config import Config + +# 数据库配置 +DATABASE_URL = Config.SQLALCHEMY_DATABASE_URI +engine = create_engine(DATABASE_URL) +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) +Base = declarative_base() + +# 创建FastAPI应用 +app = FastAPI( + title="KaMiXiTong API", + description="软件授权管理系统的FastAPI接口", + version="1.0.0", + docs_url="/docs", + redoc_url="/redoc" +) + +# 添加CORS中间件 +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# 数据模型定义 +class APIBase(BaseModel): + api_name: str + description: Optional[str] = None + status: Optional[int] = 1 + + class Config: + from_attributes = True # Pydantic V2中orm_mode已重命名为from_attributes + +class APICreate(APIBase): + api_id: Optional[str] = None + +class APIUpdate(APIBase): + pass + +class APIInDB(APIBase): + api_id: str + create_time: datetime + update_time: datetime + +class APIKeyBase(BaseModel): + name: str + description: Optional[str] = None + status: Optional[int] = 1 + expire_time: Optional[datetime] = None + + class Config: + from_attributes = True # Pydantic V2中orm_mode已重命名为from_attributes + +class APIKeyCreate(APIKeyBase): + api_id: str + +class APIKeyUpdate(APIKeyBase): + api_id: Optional[str] = None + +class APIKeyInDB(APIKeyBase): + id: int + key: str + api_id: str + create_time: datetime + update_time: datetime + +class APIVersionBase(BaseModel): + version_num: str + description: Optional[str] = None + publish_status: Optional[int] = 0 + + class Config: + from_attributes = True # Pydantic V2中orm_mode已重命名为from_attributes + +class APIVersionCreate(APIVersionBase): + api_id: str + +class APIVersionUpdate(APIVersionBase): + api_id: Optional[str] = None + +class APIVersionInDB(APIVersionBase): + id: int + api_id: str + create_time: datetime + update_time: datetime + +# 数据库模型 +class DBAPI(Base): + __tablename__ = "api" + + api_id = Column(String(32), primary_key=True) + api_name = Column(String(64), nullable=False) + description = Column(Text, nullable=True) + status = Column(Integer, nullable=False, default=1) + create_time = Column(DateTime, default=datetime.utcnow) + update_time = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + +class DBAPIKey(Base): + __tablename__ = "api_key" + + id = Column(Integer, primary_key=True) + key = Column(String(64), nullable=False, unique=True) + api_id = Column(String(32), ForeignKey('api.api_id'), nullable=False) + name = Column(String(64), nullable=False) + description = Column(Text, nullable=True) + status = Column(Integer, nullable=False, default=1) + create_time = Column(DateTime, default=datetime.utcnow) + update_time = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + expire_time = Column(DateTime, nullable=True) + +class DBAPIVersion(Base): + __tablename__ = "api_version" + + id = Column(Integer, primary_key=True) + version_num = Column(String(32), nullable=False) + api_id = Column(String(32), ForeignKey('api.api_id'), nullable=False) + description = Column(Text, nullable=True) + publish_status = Column(Integer, nullable=False, default=0) + create_time = Column(DateTime, default=datetime.utcnow) + update_time = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + +# 依赖项 +def get_db(): + db = SessionLocal() + try: + yield db + finally: + db.close() + +# API路由 +@app.get("/") +async def root(): + return {"message": "欢迎使用KaMiXiTong FastAPI接口", "version": "1.0.0"} + +@app.get("/apis", response_model=List[APIInDB]) +async def get_apis(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): + """获取API列表""" + apis = db.query(DBAPI).offset(skip).limit(limit).all() + return apis + +@app.post("/apis", response_model=APIInDB) +async def create_api(api: APICreate, db: Session = Depends(get_db)): + """创建API""" + # 检查自定义ID是否重复 + if api.api_id: + existing = db.query(DBAPI).filter(DBAPI.api_id == api.api_id).first() + if existing: + raise HTTPException(status_code=400, detail="API ID已存在") + + # 准备API数据 + api_data = api.model_dump() + # 处理API ID生成 + if api.api_id is None or api.api_id == "": + # 自动生成API ID + import uuid + api_data['api_id'] = f"API_{uuid.uuid4().hex[:8]}".upper() + + # 创建API + db_api = DBAPI(**api_data) + db.add(db_api) + db.commit() + db.refresh(db_api) + return db_api + +@app.get("/apis/{api_id}", response_model=APIInDB) +async def get_api(api_id: str, db: Session = Depends(get_db)): + """获取API详情""" + api = db.query(DBAPI).filter(DBAPI.api_id == api_id).first() + if not api: + raise HTTPException(status_code=404, detail="API不存在") + return api + +@app.put("/apis/{api_id}", response_model=APIInDB) +async def update_api(api_id: str, api: APIUpdate, db: Session = Depends(get_db)): + """更新API""" + db_api = db.query(DBAPI).filter(DBAPI.api_id == api_id).first() + if not db_api: + raise HTTPException(status_code=404, detail="API不存在") + + for key, value in api.model_dump().items(): + setattr(db_api, key, value) + + db.commit() + db.refresh(db_api) + return db_api + +@app.delete("/apis/{api_id}") +async def delete_api(api_id: str, db: Session = Depends(get_db)): + """删除API""" + db_api = db.query(DBAPI).filter(DBAPI.api_id == api_id).first() + if not db_api: + raise HTTPException(status_code=404, detail="API不存在") + + # 检查是否有关联的密钥 + key_count = db.query(DBAPIKey).filter(DBAPIKey.api_id == api_id).count() + if key_count > 0: + raise HTTPException(status_code=400, detail=f"API下还有 {key_count} 个密钥,无法删除") + + db.delete(db_api) + db.commit() + return {"message": "API删除成功"} + +# API密钥路由 +@app.get("/api_keys", response_model=List[APIKeyInDB]) +async def get_api_keys(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): + """获取API密钥列表""" + keys = db.query(DBAPIKey).offset(skip).limit(limit).all() + return keys + +@app.post("/api_keys", response_model=APIKeyInDB) +async def create_api_key(key: APIKeyCreate, db: Session = Depends(get_db)): + """生成API密钥""" + # 检查API是否存在 + api = db.query(DBAPI).filter(DBAPI.api_id == key.api_id).first() + if not api: + raise HTTPException(status_code=404, detail="指定的API不存在") + + # 生成唯一的API密钥 + import secrets + import string + characters = string.ascii_letters + string.digits + api_key_value = ''.join(secrets.choice(characters) for _ in range(32)) + + # 确保密钥唯一 + max_attempts = 10 + for _ in range(max_attempts): + existing = db.query(DBAPIKey).filter(DBAPIKey.key == api_key_value).first() + if not existing: + break + api_key_value = ''.join(secrets.choice(characters) for _ in range(32)) + else: + raise HTTPException(status_code=500, detail="无法生成唯一的API密钥,请稍后重试") + + # 准备密钥数据 + key_data = key.model_dump() + key_data['key'] = api_key_value + + # 创建API密钥 + db_key = DBAPIKey(**key_data) + db.add(db_key) + db.commit() + db.refresh(db_key) + return db_key + +@app.get("/api_keys/{key_id}", response_model=APIKeyInDB) +async def get_api_key(key_id: int, db: Session = Depends(get_db)): + """获取API密钥详情""" + key = db.query(DBAPIKey).filter(DBAPIKey.id == key_id).first() + if not key: + raise HTTPException(status_code=404, detail="API密钥不存在") + return key + +@app.put("/api_keys/{key_id}", response_model=APIKeyInDB) +async def update_api_key(key_id: int, key: APIKeyUpdate, db: Session = Depends(get_db)): + """更新API密钥""" + db_key = db.query(DBAPIKey).filter(DBAPIKey.id == key_id).first() + if not db_key: + raise HTTPException(status_code=404, detail="API密钥不存在") + + # 如果更新了api_id,检查API是否存在 + if key.api_id and key.api_id != db_key.api_id: + api = db.query(DBAPI).filter(DBAPI.api_id == key.api_id).first() + if not api: + raise HTTPException(status_code=404, detail="指定的API不存在") + + for field, value in key.model_dump().items(): + if value is not None: + setattr(db_key, field, value) + + db.commit() + db.refresh(db_key) + return db_key + +@app.delete("/api_keys/{key_id}") +async def delete_api_key(key_id: int, db: Session = Depends(get_db)): + """删除API密钥""" + db_key = db.query(DBAPIKey).filter(DBAPIKey.id == key_id).first() + if not db_key: + raise HTTPException(status_code=404, detail="API密钥不存在") + + db.delete(db_key) + db.commit() + return {"message": "API密钥删除成功"} + +# API版本路由 +@app.get("/api_versions", response_model=List[APIVersionInDB]) +async def get_api_versions(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): + """获取API版本列表""" + versions = db.query(DBAPIVersion).offset(skip).limit(limit).all() + return versions + +@app.post("/api_versions", response_model=APIVersionInDB) +async def create_api_version(version: APIVersionCreate, db: Session = Depends(get_db)): + """创建API版本""" + # 检查API是否存在 + api = db.query(DBAPI).filter(DBAPI.api_id == version.api_id).first() + if not api: + raise HTTPException(status_code=404, detail="指定的API不存在") + + # 检查版本号是否重复 + existing = db.query(DBAPIVersion).filter( + DBAPIVersion.api_id == version.api_id, + DBAPIVersion.version_num == version.version_num + ).first() + if existing: + raise HTTPException(status_code=400, detail="该API下已存在相同版本号") + + # 创建API版本 + db_version = DBAPIVersion(**version.model_dump()) + db.add(db_version) + db.commit() + db.refresh(db_version) + return db_version + +@app.get("/api_versions/{version_id}", response_model=APIVersionInDB) +async def get_api_version(version_id: int, db: Session = Depends(get_db)): + """获取API版本详情""" + version = db.query(DBAPIVersion).filter(DBAPIVersion.id == version_id).first() + if not version: + raise HTTPException(status_code=404, detail="API版本不存在") + return version + +@app.put("/api_versions/{version_id}", response_model=APIVersionInDB) +async def update_api_version(version_id: int, version: APIVersionUpdate, db: Session = Depends(get_db)): + """更新API版本""" + db_version = db.query(DBAPIVersion).filter(DBAPIVersion.id == version_id).first() + if not db_version: + raise HTTPException(status_code=404, detail="API版本不存在") + + # 如果更新了api_id,检查API是否存在 + if version.api_id and version.api_id != db_version.api_id: + api = db.query(DBAPI).filter(DBAPI.api_id == version.api_id).first() + if not api: + raise HTTPException(status_code=404, detail="指定的API不存在") + + # 如果更新了version_num,检查版本号是否重复(排除自己) + if version.version_num and version.version_num != db_version.version_num: + existing = db.query(DBAPIVersion).filter( + DBAPIVersion.api_id == db_version.api_id, + DBAPIVersion.version_num == version.version_num, + DBAPIVersion.id != version_id + ).first() + if existing: + raise HTTPException(status_code=400, detail="该API下已存在相同版本号") + + for field, value in version.model_dump().items(): + if value is not None: + setattr(db_version, field, value) + + db.commit() + db.refresh(db_version) + return db_version + +@app.delete("/api_versions/{version_id}") +async def delete_api_version(version_id: int, db: Session = Depends(get_db)): + """删除API版本""" + db_version = db.query(DBAPIVersion).filter(DBAPIVersion.id == version_id).first() + if not db_version: + raise HTTPException(status_code=404, detail="API版本不存在") + + db.delete(db_version) + db.commit() + return {"message": "API版本删除成功"} + +if __name__ == "__main__": + import uvicorn + # 使用127.0.0.1而不是0.0.0.0来避免权限问题 + uvicorn.run(app, host="127.0.0.1", port=9002, log_level="info") \ No newline at end of file diff --git a/login_page.html b/login_page.html new file mode 100644 index 0000000..bf99525 Binary files /dev/null and b/login_page.html differ diff --git a/logs/kamaxitong.log b/logs/kamaxitong.log new file mode 100644 index 0000000..5f36527 --- /dev/null +++ b/logs/kamaxitong.log @@ -0,0 +1,80 @@ +2025-11-15 21:43:14,795 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 21:43:16,011 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 21:47:51,654 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 21:47:58,940 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 21:52:14,483 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 21:52:15,836 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 21:53:12,767 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 21:53:12,928 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 21:55:02,780 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 21:55:05,971 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 21:56:57,235 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 21:56:57,396 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:00:51,287 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:02:34,407 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:02:36,353 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:03:17,696 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:03:18,658 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:05:58,479 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:05:58,767 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:12:33,295 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:12:34,640 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:14:17,117 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:14:17,190 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:14:18,245 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:14:24,345 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:14:24,378 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:14:24,686 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:14:30,459 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:14:30,476 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:14:30,487 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:14:35,024 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:14:35,074 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:14:35,227 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:14:43,938 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:14:44,004 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:14:44,004 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:14:55,395 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:14:55,501 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:14:56,055 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:15:00,950 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:15:01,093 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:15:01,290 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:15:10,768 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:15:12,443 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:15:13,140 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:15:22,104 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:15:22,684 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:15:22,758 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:15:36,038 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:15:37,583 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:15:37,723 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:15:55,634 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:15:55,694 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:15:55,725 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:16:05,643 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:16:05,813 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:16:06,131 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:16:34,892 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:16:34,916 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:16:35,168 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:16:52,773 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:16:52,821 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:16:53,293 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:18:58,174 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:18:59,209 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:20:36,569 ERROR: ¼־ʧ: (pymysql.err.ProgrammingError) (1064, 'You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near \'\'old\': {\'email\': "\'2339117167@qq.com\'", \'role\': \'1\', \'status\': \'1\'}, \'new\': {\'em\' at line 1') +[SQL: INSERT INTO audit_log (admin_id, action, target_type, target_id, details, ip_address, user_agent, create_time) VALUES (%(admin_id)s, %(action)s, %(target_type)s, %(target_id)s, %(details)s, %(ip_address)s, %(user_agent)s, %(create_time)s)] +[parameters: {'admin_id': 2, 'action': 'UPDATE', 'target_type': 'ADMIN', 'target_id': 2, 'details': {'old': {'email': '2339117167@qq.com', 'role': 1, 'status': 1}, 'new': {'email': '2339117167@qq.com', 'role': 0, 'status': 1}}, 'ip_address': '127.0.0.1', 'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36 Edg/142.0.0.0', 'create_time': datetime.datetime(2025, 11, 15, 14, 20, 36, 556170)}] +(Background on this error at: https://sqlalche.me/e/20/f405) [in D:\work\code\python\KaMiXiTong\app\models\audit_log.py:59] +2025-11-15 22:20:36,594 ERROR: ¼־ʧ: (pymysql.err.ProgrammingError) (1064, 'You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near \'\'old\': {\'email\': "\'2339117167@qq.com\'", \'role\': \'1\', \'status\': \'1\'}, \'new\': {\'em\' at line 1') +[SQL: INSERT INTO audit_log (admin_id, action, target_type, target_id, details, ip_address, user_agent, create_time) VALUES (%(admin_id)s, %(action)s, %(target_type)s, %(target_id)s, %(details)s, %(ip_address)s, %(user_agent)s, %(create_time)s)] +[parameters: {'admin_id': 2, 'action': 'UPDATE_ADMIN', 'target_type': 'ADMIN', 'target_id': 2, 'details': {'old': {'email': '2339117167@qq.com', 'role': 1, 'status': 1}, 'new': {'email': '2339117167@qq.com', 'role': 0, 'status': 1}}, 'ip_address': '127.0.0.1', 'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36 Edg/142.0.0.0', 'create_time': datetime.datetime(2025, 11, 15, 14, 20, 36, 584730)}] +(Background on this error at: https://sqlalche.me/e/20/f405) [in D:\work\code\python\KaMiXiTong\app\models\audit_log.py:59] +2025-11-15 22:21:39,838 ERROR: ¼־ʧ: (pymysql.err.ProgrammingError) (1064, 'You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near \'\'username\': "\'test\'", \'role\': \'0\', \'status\': \'1\'}, \'127.0.0.1\', \'Mozilla/5.0 (Wi\' at line 1') +[SQL: INSERT INTO audit_log (admin_id, action, target_type, target_id, details, ip_address, user_agent, create_time) VALUES (%(admin_id)s, %(action)s, %(target_type)s, %(target_id)s, %(details)s, %(ip_address)s, %(user_agent)s, %(create_time)s)] +[parameters: {'admin_id': 2, 'action': 'CREATE', 'target_type': 'ADMIN', 'target_id': 5, 'details': {'username': 'test', 'role': 0, 'status': 1}, 'ip_address': '127.0.0.1', 'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36 Edg/142.0.0.0', 'create_time': datetime.datetime(2025, 11, 15, 14, 21, 39, 834164)}] +(Background on this error at: https://sqlalche.me/e/20/f405) [in D:\work\code\python\KaMiXiTong\app\models\audit_log.py:59] +2025-11-15 22:21:46,162 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:21:46,434 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 22:21:46,482 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] diff --git a/logs/kamaxitong.log.10 b/logs/kamaxitong.log.10 new file mode 100644 index 0000000..88d8fd2 --- /dev/null +++ b/logs/kamaxitong.log.10 @@ -0,0 +1,102 @@ +2025-11-15 14:20:05,639 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:20:07,103 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:20:55,708 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:20:57,062 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:21:28,777 ERROR: ȡͳʧ: (pymysql.err.ProgrammingError) (1146, "Table 'kamaxitong.api' doesn't exist") +[SQL: SELECT count(*) AS count_1 +FROM (SELECT api.api_id AS api_api_id, api.api_name AS api_api_name, api.description AS api_description, api.status AS api_status, api.create_time AS api_create_time, api.update_time AS api_update_time +FROM api) AS anon_1] +(Background on this error at: https://sqlalche.me/e/20/f405) [in D:\work\code\python\KaMiXiTong\app\api\statistics.py:86] +2025-11-15 14:22:29,153 ERROR: ȡͳʧ: (pymysql.err.ProgrammingError) (1146, "Table 'kamaxitong.api' doesn't exist") +[SQL: SELECT count(*) AS count_1 +FROM (SELECT api.api_id AS api_api_id, api.api_name AS api_api_name, api.description AS api_description, api.status AS api_status, api.create_time AS api_create_time, api.update_time AS api_update_time +FROM api) AS anon_1] +(Background on this error at: https://sqlalche.me/e/20/f405) [in D:\work\code\python\KaMiXiTong\app\api\statistics.py:86] +2025-11-15 14:22:40,129 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:23:10,648 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:23:11,851 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:23:23,719 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:23:29,274 ERROR: ȡͳʧ: (pymysql.err.ProgrammingError) (1146, "Table 'kamaxitong.api' doesn't exist") +[SQL: SELECT count(*) AS count_1 +FROM (SELECT api.api_id AS api_api_id, api.api_name AS api_api_name, api.description AS api_description, api.status AS api_status, api.create_time AS api_create_time, api.update_time AS api_update_time +FROM api) AS anon_1] +(Background on this error at: https://sqlalche.me/e/20/f405) [in D:\work\code\python\KaMiXiTong\app\api\statistics.py:86] +2025-11-15 14:24:29,144 ERROR: ȡͳʧ: (pymysql.err.ProgrammingError) (1146, "Table 'kamaxitong.api' doesn't exist") +[SQL: SELECT count(*) AS count_1 +FROM (SELECT api.api_id AS api_api_id, api.api_name AS api_api_name, api.description AS api_description, api.status AS api_status, api.create_time AS api_create_time, api.update_time AS api_update_time +FROM api) AS anon_1] +(Background on this error at: https://sqlalche.me/e/20/f405) [in D:\work\code\python\KaMiXiTong\app\api\statistics.py:86] +2025-11-15 14:25:07,065 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:25:07,424 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:25:19,299 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:25:29,262 ERROR: ȡͳʧ: (pymysql.err.ProgrammingError) (1146, "Table 'kamaxitong.api' doesn't exist") +[SQL: SELECT count(*) AS count_1 +FROM (SELECT api.api_id AS api_api_id, api.api_name AS api_api_name, api.description AS api_description, api.status AS api_status, api.create_time AS api_create_time, api.update_time AS api_update_time +FROM api) AS anon_1] +(Background on this error at: https://sqlalche.me/e/20/f405) [in D:\work\code\python\KaMiXiTong\app\api\statistics.py:86] +2025-11-15 14:26:36,221 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:26:38,495 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:27:13,078 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:27:20,119 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:28:43,984 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:29:11,098 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:29:42,354 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:29:53,601 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:30:24,671 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:30:26,018 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:34:01,346 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:34:02,697 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:40:16,572 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:40:16,603 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:40:26,594 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:40:26,878 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:40:30,511 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:40:31,139 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:41:45,467 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:41:45,860 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:41:49,111 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:41:49,214 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:42:22,440 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 14:42:22,747 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:08:23,832 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:08:23,851 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:10:01,187 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:10:02,285 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:12:33,664 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:12:35,067 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:13:54,881 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:13:55,572 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:16:39,819 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:18:12,136 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:18:12,364 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:18:12,364 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:18:17,816 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:22:38,979 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:25:24,198 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:30:01,058 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:32:09,383 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:32:44,404 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:35:30,494 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:35:54,826 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:36:11,573 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:36:11,781 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:36:11,781 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:36:18,774 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:36:23,481 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:36:27,849 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:38:39,259 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 15:44:39,290 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 16:14:14,052 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 16:14:14,273 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 16:14:14,273 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 16:14:30,243 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 16:14:31,707 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 20:56:49,158 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 21:07:59,431 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 21:08:00,550 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 21:26:31,566 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 21:27:36,458 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 21:29:02,008 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 21:29:47,827 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 21:29:51,087 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] +2025-11-15 21:29:56,366 INFO: KaMiXiTong startup [in D:\work\code\python\KaMiXiTong\config.py:66] diff --git a/migrations/versions/abcd1234_create_api_tables.py b/migrations/versions/abcd1234_create_api_tables.py new file mode 100644 index 0000000..4da7a3f --- /dev/null +++ b/migrations/versions/abcd1234_create_api_tables.py @@ -0,0 +1,71 @@ +"""create api tables + +Revision ID: abcd1234_create_api_tables +Revises: fe6513ff0455 +Create Date: 2025-11-15 10:00:00.000000 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import mysql + +# revision identifiers, used by Alembic. +revision = 'abcd1234_create_api_tables' +down_revision = 'fe6513ff0455' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('api', + sa.Column('api_id', sa.String(length=32), nullable=False), + sa.Column('api_name', sa.String(length=64), nullable=False), + sa.Column('description', sa.Text(), nullable=True), + sa.Column('status', sa.Integer(), nullable=False), + sa.Column('create_time', sa.DateTime(), nullable=True), + sa.Column('update_time', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('api_id') + ) + + op.create_table('api_version', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('version_num', sa.String(length=32), nullable=False), + sa.Column('api_id', sa.String(length=32), nullable=False), + sa.Column('description', sa.Text(), nullable=True), + sa.Column('publish_status', sa.Integer(), nullable=False), + sa.Column('create_time', sa.DateTime(), nullable=True), + sa.Column('update_time', sa.DateTime(), nullable=True), + sa.ForeignKeyConstraint(['api_id'], ['api.api_id'], ), + sa.PrimaryKeyConstraint('id') + ) + + op.create_table('api_key', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('key', sa.String(length=64), nullable=False), + sa.Column('api_id', sa.String(length=32), nullable=False), + sa.Column('name', sa.String(length=64), nullable=False), + sa.Column('description', sa.Text(), nullable=True), + sa.Column('status', sa.Integer(), nullable=False), + sa.Column('create_time', sa.DateTime(), nullable=True), + sa.Column('update_time', sa.DateTime(), nullable=True), + sa.Column('expire_time', sa.DateTime(), nullable=True), + sa.ForeignKeyConstraint(['api_id'], ['api.api_id'], ), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('key') + ) + + # 添加索引 + op.create_index(op.f('ix_api_key_api_id'), 'api_key', ['api_id'], unique=False) + op.create_index(op.f('ix_api_version_api_id'), 'api_version', ['api_id'], unique=False) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_api_version_api_id'), table_name='api_version') + op.drop_index(op.f('ix_api_key_api_id'), table_name='api_key') + op.drop_table('api_key') + op.drop_table('api_version') + op.drop_table('api') + # ### end Alembic commands ### \ No newline at end of file diff --git a/requirements-fastapi.txt b/requirements-fastapi.txt new file mode 100644 index 0000000..f8050aa --- /dev/null +++ b/requirements-fastapi.txt @@ -0,0 +1,3 @@ +fastapi==0.104.1 +uvicorn[standard]==0.24.0 +python-multipart==0.0.6 \ No newline at end of file diff --git a/simple_test.py b/simple_test.py new file mode 100644 index 0000000..f582a3f --- /dev/null +++ b/simple_test.py @@ -0,0 +1,40 @@ +import requests +import json + +# 创建会话以保持登录状态 +session = requests.Session() + +def test_apis(): + """直接测试API接口""" + try: + # 1. 测试创建产品(这会生成操作日志) + print("=== 测试创建产品 ===") + product_data = { + 'product_name': '测试产品', + 'description': '这是一个测试产品' + } + + response = session.post( + "http://localhost:5000/api/v1/products", + json=product_data, + headers={'Content-Type': 'application/json'} + ) + print(f"创建产品状态码: {response.status_code}") + print(f"创建产品响应: {response.text}") + + # 2. 测试获取操作日志 + print("\n=== 测试获取操作日志 ===") + response = session.get("http://localhost:5000/api/v1/logs") + print(f"获取日志状态码: {response.status_code}") + if response.status_code == 200: + log_data = response.json() + print(f"日志数据: {json.dumps(log_data, indent=2, ensure_ascii=False)}") + else: + print(f"获取日志失败: {response.text}") + + except Exception as e: + print(f"测试过程中出现错误: {e}") + +if __name__ == "__main__": + print("开始简单测试...") + test_apis() \ No newline at end of file diff --git a/test_log.py b/test_log.py new file mode 100644 index 0000000..13accd7 --- /dev/null +++ b/test_log.py @@ -0,0 +1,21 @@ +import requests +import json + +# 测试创建产品API +def test_create_product(): + url = "http://localhost:5000/api/v1/products" + headers = {"Content-Type": "application/json"} + data = { + "product_name": "测试产品", + "description": "这是一个测试产品" + } + + try: + response = requests.post(url, headers=headers, data=json.dumps(data)) + print(f"Status Code: {response.status_code}") + print(f"Response: {response.text}") + except Exception as e: + print(f"Error: {e}") + +if __name__ == "__main__": + test_create_product() \ No newline at end of file diff --git a/test_log_with_auth.py b/test_log_with_auth.py new file mode 100644 index 0000000..f8ba1d1 --- /dev/null +++ b/test_log_with_auth.py @@ -0,0 +1,67 @@ +import requests +import json + +# 创建会话以保持登录状态 +session = requests.Session() + +def login(): + """登录系统""" + url = "http://localhost:5000/api/v1/auth/login" + headers = {"Content-Type": "application/json"} + data = { + "username": "admin", + "password": "admin123" + } + + try: + response = session.post(url, headers=headers, data=json.dumps(data)) + print(f"Login Status Code: {response.status_code}") + print(f"Login Response: {response.text}") + return response.status_code == 200 + except Exception as e: + print(f"Login Error: {e}") + return False + +def test_create_product(): + """测试创建产品API""" + url = "http://localhost:5000/api/v1/products" + headers = {"Content-Type": "application/json"} + data = { + "product_name": "测试产品", + "description": "这是一个测试产品" + } + + try: + response = session.post(url, headers=headers, data=json.dumps(data)) + print(f"Create Product Status Code: {response.status_code}") + print(f"Create Product Response: {response.text}") + return response.status_code == 200 + except Exception as e: + print(f"Create Product Error: {e}") + return False + +def test_get_logs(): + """测试获取日志API""" + url = "http://localhost:5000/api/v1/logs" + try: + response = session.get(url) + print(f"Get Logs Status Code: {response.status_code}") + print(f"Get Logs Response: {response.text}") + return response.status_code == 200 + except Exception as e: + print(f"Get Logs Error: {e}") + return False + +if __name__ == "__main__": + # 登录 + if login(): + print("登录成功") + # 测试创建产品 + if test_create_product(): + print("创建产品成功") + # 测试获取日志 + test_get_logs() + else: + print("创建产品失败") + else: + print("登录失败") \ No newline at end of file diff --git a/test_login.py b/test_login.py new file mode 100644 index 0000000..689b411 --- /dev/null +++ b/test_login.py @@ -0,0 +1,29 @@ +import requests + +# 测试登录页面访问 +response = requests.get('http://127.0.0.1:5000/login') +print(f"Status Code: {response.status_code}") +print(f"Content Length: {len(response.content)}") +print(f"Content Type: {response.headers.get('content-type')}") + +# 检查页面内容 +content = response.text +if '登录' in content: + print("页面包含登录相关文本") +else: + print("页面不包含登录相关文本") + +# 检查是否有错误信息 +if '错误' in content or 'Error' in content: + print("页面包含错误信息") +else: + print("页面不包含明显错误信息") + +# 尝试登录 +login_data = { + 'username': 'admin', + 'password': 'admin123', + 'csrf_token': '' # 我们需要从页面中提取CSRF令牌 +} + +print("尝试登录测试...") \ No newline at end of file diff --git a/test_web_log.py b/test_web_log.py new file mode 100644 index 0000000..89429b9 --- /dev/null +++ b/test_web_log.py @@ -0,0 +1,137 @@ +import requests +from bs4 import BeautifulSoup + +# 创建会话以保持登录状态 +session = requests.Session() + +def get_csrf_token(): + """从登录页面获取CSRF令牌""" + try: + response = session.get("http://localhost:5000/login") + soup = BeautifulSoup(response.text, 'html.parser') + csrf_input = soup.find('input', {'name': 'csrf_token'}) + if csrf_input: + # 直接获取value属性 + try: + # 忽略类型检查错误 + csrf_token = csrf_input.get('value') # type: ignore + if not csrf_token: + csrf_token = csrf_input['value'] # type: ignore + if csrf_token: + return csrf_token + except: + pass + print("未找到CSRF令牌输入字段") + return None + except Exception as e: + print(f"获取CSRF令牌失败: {e}") + return None + +def login(): + """登录系统""" + try: + # 获取CSRF令牌 + csrf_token = get_csrf_token() + if not csrf_token: + return False + + # 准备登录数据 + login_data = { + 'username': 'admin', + 'password': 'admin123', + 'csrf_token': csrf_token + } + + # 发送登录请求 + response = session.post("http://localhost:5000/login", data=login_data) + + # 检查是否登录成功(通过重定向到dashboard来判断) + if response.url and 'dashboard' in response.url: + print("登录成功") + return True + else: + print(f"登录失败,状态码: {response.status_code}") + print(f"响应URL: {response.url}") + return False + + except Exception as e: + print(f"登录过程中出现错误: {e}") + return False + +def test_create_product(): + """测试创建产品""" + try: + # 准备产品数据 + product_data = { + 'product_name': '测试产品', + 'description': '这是一个测试产品' + } + + # 发送创建产品请求 + response = session.post( + "http://localhost:5000/api/v1/products", + json=product_data, + headers={'Content-Type': 'application/json'} + ) + + print(f"创建产品状态码: {response.status_code}") + print(f"创建产品响应: {response.text}") + return response.status_code == 200 + + except Exception as e: + print(f"创建产品时出现错误: {e}") + return False + +def test_get_logs(): + """测试获取操作日志""" + try: + # 发送获取日志请求 + response = session.get("http://localhost:5000/api/v1/logs") + + print(f"获取日志状态码: {response.status_code}") + print(f"获取日志响应: {response.text}") + return response.status_code == 200 + + except Exception as e: + print(f"获取日志时出现错误: {e}") + return False + +def test_view_logs_page(): + """测试访问日志页面""" + try: + # 访问日志管理页面 + response = session.get("http://localhost:5000/logs") + + print(f"访问日志页面状态码: {response.status_code}") + if response.status_code == 200: + print("成功访问日志管理页面") + return True + else: + print(f"访问日志页面失败: {response.text}") + return False + + except Exception as e: + print(f"访问日志页面时出现错误: {e}") + return False + +if __name__ == "__main__": + print("开始测试日志功能...") + + # 登录 + if login(): + print("=== 登录成功 ===") + + # 测试创建产品(这会生成操作日志) + print("\n=== 测试创建产品 ===") + test_create_product() + + # 测试获取操作日志 + print("\n=== 测试获取操作日志 ===") + test_get_logs() + + # 测试访问日志页面 + print("\n=== 测试访问日志页面 ===") + test_view_logs_page() + + else: + print("登录失败,无法继续测试") \ No newline at end of file diff --git a/verify_log.py b/verify_log.py new file mode 100644 index 0000000..498d2c4 --- /dev/null +++ b/verify_log.py @@ -0,0 +1,41 @@ +import requests +import json + +# 直接测试日志API,绕过认证检查(在实际环境中应该有认证) +def test_log_functionality(): + """测试日志功能""" + try: + # 1. 先手动创建一个产品(绕过认证检查) + print("=== 手动创建产品以生成日志 ===") + + # 我们直接查看数据库中是否已有产品 + print("检查现有产品...") + + # 2. 测试获取操作日志(绕过认证检查) + print("\n=== 测试获取操作日志 ===") + + # 由于我们无法绕过Flask-Login的认证检查,我们直接查看日志文件 + print("查看日志文件内容...") + + try: + with open('logs/kamaxitong.log', 'r', encoding='utf-8') as f: + lines = f.readlines() + print(f"日志文件共有 {len(lines)} 行") + # 显示最后几行 + for line in lines[-10:]: + print(line.strip()) + except FileNotFoundError: + print("日志文件不存在") + except Exception as e: + print(f"读取日志文件失败: {e}") + + # 3. 测试审计日志表 + print("\n=== 测试审计日志表 ===") + # 我们需要直接连接数据库来查看审计日志 + + except Exception as e: + print(f"测试过程中出现错误: {e}") + +if __name__ == "__main__": + print("验证日志功能...") + test_log_functionality() \ No newline at end of file