Kamixitong/fastapi_app.py

392 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
FastAPI接口应用
提供现代化的API接口和自动生成的文档
"""
import os
import sys
from datetime import datetime, timedelta
from typing import List, Optional
from fastapi import FastAPI, HTTPException, Depends, status
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, Boolean, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker, Session
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 导入配置
from config import Config
# 数据库配置
DATABASE_URL = Config.SQLALCHEMY_DATABASE_URI
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# 创建FastAPI应用
app = FastAPI(
title="KaMiXiTong API",
description="软件授权管理系统的FastAPI接口",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# 添加CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 数据模型定义
class APIBase(BaseModel):
api_name: str
description: Optional[str] = None
status: Optional[int] = 1
class Config:
from_attributes = True # Pydantic V2中orm_mode已重命名为from_attributes
class APICreate(APIBase):
api_id: Optional[str] = None
class APIUpdate(APIBase):
pass
class APIInDB(APIBase):
api_id: str
create_time: datetime
update_time: datetime
class APIKeyBase(BaseModel):
name: str
description: Optional[str] = None
status: Optional[int] = 1
expire_time: Optional[datetime] = None
class Config:
from_attributes = True # Pydantic V2中orm_mode已重命名为from_attributes
class APIKeyCreate(APIKeyBase):
api_id: str
class APIKeyUpdate(APIKeyBase):
api_id: Optional[str] = None
class APIKeyInDB(APIKeyBase):
id: int
key: str
api_id: str
create_time: datetime
update_time: datetime
class APIVersionBase(BaseModel):
version_num: str
description: Optional[str] = None
publish_status: Optional[int] = 0
class Config:
from_attributes = True # Pydantic V2中orm_mode已重命名为from_attributes
class APIVersionCreate(APIVersionBase):
api_id: str
class APIVersionUpdate(APIVersionBase):
api_id: Optional[str] = None
class APIVersionInDB(APIVersionBase):
id: int
api_id: str
create_time: datetime
update_time: datetime
# 数据库模型
class DBAPI(Base):
__tablename__ = "api"
api_id = Column(String(32), primary_key=True)
api_name = Column(String(64), nullable=False)
description = Column(Text, nullable=True)
status = Column(Integer, nullable=False, default=1)
create_time = Column(DateTime, default=datetime.utcnow)
update_time = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class DBAPIKey(Base):
__tablename__ = "api_key"
id = Column(Integer, primary_key=True)
key = Column(String(64), nullable=False, unique=True)
api_id = Column(String(32), ForeignKey('api.api_id'), nullable=False)
name = Column(String(64), nullable=False)
description = Column(Text, nullable=True)
status = Column(Integer, nullable=False, default=1)
create_time = Column(DateTime, default=datetime.utcnow)
update_time = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
expire_time = Column(DateTime, nullable=True)
class DBAPIVersion(Base):
__tablename__ = "api_version"
id = Column(Integer, primary_key=True)
version_num = Column(String(32), nullable=False)
api_id = Column(String(32), ForeignKey('api.api_id'), nullable=False)
description = Column(Text, nullable=True)
publish_status = Column(Integer, nullable=False, default=0)
create_time = Column(DateTime, default=datetime.utcnow)
update_time = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 依赖项
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# API路由
@app.get("/")
async def root():
return {"message": "欢迎使用KaMiXiTong FastAPI接口", "version": "1.0.0"}
@app.get("/apis", response_model=List[APIInDB])
async def get_apis(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
"""获取API列表"""
apis = db.query(DBAPI).offset(skip).limit(limit).all()
return apis
@app.post("/apis", response_model=APIInDB)
async def create_api(api: APICreate, db: Session = Depends(get_db)):
"""创建API"""
# 检查自定义ID是否重复
if api.api_id:
existing = db.query(DBAPI).filter(DBAPI.api_id == api.api_id).first()
if existing:
raise HTTPException(status_code=400, detail="API ID已存在")
# 准备API数据
api_data = api.model_dump()
# 处理API ID生成
if api.api_id is None or api.api_id == "":
# 自动生成API ID
import uuid
api_data['api_id'] = f"API_{uuid.uuid4().hex[:8]}".upper()
# 创建API
db_api = DBAPI(**api_data)
db.add(db_api)
db.commit()
db.refresh(db_api)
return db_api
@app.get("/apis/{api_id}", response_model=APIInDB)
async def get_api(api_id: str, db: Session = Depends(get_db)):
"""获取API详情"""
api = db.query(DBAPI).filter(DBAPI.api_id == api_id).first()
if not api:
raise HTTPException(status_code=404, detail="API不存在")
return api
@app.put("/apis/{api_id}", response_model=APIInDB)
async def update_api(api_id: str, api: APIUpdate, db: Session = Depends(get_db)):
"""更新API"""
db_api = db.query(DBAPI).filter(DBAPI.api_id == api_id).first()
if not db_api:
raise HTTPException(status_code=404, detail="API不存在")
for key, value in api.model_dump().items():
setattr(db_api, key, value)
db.commit()
db.refresh(db_api)
return db_api
@app.delete("/apis/{api_id}")
async def delete_api(api_id: str, db: Session = Depends(get_db)):
"""删除API"""
db_api = db.query(DBAPI).filter(DBAPI.api_id == api_id).first()
if not db_api:
raise HTTPException(status_code=404, detail="API不存在")
# 检查是否有关联的密钥
key_count = db.query(DBAPIKey).filter(DBAPIKey.api_id == api_id).count()
if key_count > 0:
raise HTTPException(status_code=400, detail=f"API下还有 {key_count} 个密钥,无法删除")
db.delete(db_api)
db.commit()
return {"message": "API删除成功"}
# API密钥路由
@app.get("/api_keys", response_model=List[APIKeyInDB])
async def get_api_keys(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
"""获取API密钥列表"""
keys = db.query(DBAPIKey).offset(skip).limit(limit).all()
return keys
@app.post("/api_keys", response_model=APIKeyInDB)
async def create_api_key(key: APIKeyCreate, db: Session = Depends(get_db)):
"""生成API密钥"""
# 检查API是否存在
api = db.query(DBAPI).filter(DBAPI.api_id == key.api_id).first()
if not api:
raise HTTPException(status_code=404, detail="指定的API不存在")
# 生成唯一的API密钥
import secrets
import string
characters = string.ascii_letters + string.digits
api_key_value = ''.join(secrets.choice(characters) for _ in range(32))
# 确保密钥唯一
max_attempts = 10
for _ in range(max_attempts):
existing = db.query(DBAPIKey).filter(DBAPIKey.key == api_key_value).first()
if not existing:
break
api_key_value = ''.join(secrets.choice(characters) for _ in range(32))
else:
raise HTTPException(status_code=500, detail="无法生成唯一的API密钥请稍后重试")
# 准备密钥数据
key_data = key.model_dump()
key_data['key'] = api_key_value
# 创建API密钥
db_key = DBAPIKey(**key_data)
db.add(db_key)
db.commit()
db.refresh(db_key)
return db_key
@app.get("/api_keys/{key_id}", response_model=APIKeyInDB)
async def get_api_key(key_id: int, db: Session = Depends(get_db)):
"""获取API密钥详情"""
key = db.query(DBAPIKey).filter(DBAPIKey.id == key_id).first()
if not key:
raise HTTPException(status_code=404, detail="API密钥不存在")
return key
@app.put("/api_keys/{key_id}", response_model=APIKeyInDB)
async def update_api_key(key_id: int, key: APIKeyUpdate, db: Session = Depends(get_db)):
"""更新API密钥"""
db_key = db.query(DBAPIKey).filter(DBAPIKey.id == key_id).first()
if not db_key:
raise HTTPException(status_code=404, detail="API密钥不存在")
# 如果更新了api_id检查API是否存在
if key.api_id and key.api_id != db_key.api_id:
api = db.query(DBAPI).filter(DBAPI.api_id == key.api_id).first()
if not api:
raise HTTPException(status_code=404, detail="指定的API不存在")
for field, value in key.model_dump().items():
if value is not None:
setattr(db_key, field, value)
db.commit()
db.refresh(db_key)
return db_key
@app.delete("/api_keys/{key_id}")
async def delete_api_key(key_id: int, db: Session = Depends(get_db)):
"""删除API密钥"""
db_key = db.query(DBAPIKey).filter(DBAPIKey.id == key_id).first()
if not db_key:
raise HTTPException(status_code=404, detail="API密钥不存在")
db.delete(db_key)
db.commit()
return {"message": "API密钥删除成功"}
# API版本路由
@app.get("/api_versions", response_model=List[APIVersionInDB])
async def get_api_versions(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
"""获取API版本列表"""
versions = db.query(DBAPIVersion).offset(skip).limit(limit).all()
return versions
@app.post("/api_versions", response_model=APIVersionInDB)
async def create_api_version(version: APIVersionCreate, db: Session = Depends(get_db)):
"""创建API版本"""
# 检查API是否存在
api = db.query(DBAPI).filter(DBAPI.api_id == version.api_id).first()
if not api:
raise HTTPException(status_code=404, detail="指定的API不存在")
# 检查版本号是否重复
existing = db.query(DBAPIVersion).filter(
DBAPIVersion.api_id == version.api_id,
DBAPIVersion.version_num == version.version_num
).first()
if existing:
raise HTTPException(status_code=400, detail="该API下已存在相同版本号")
# 创建API版本
db_version = DBAPIVersion(**version.model_dump())
db.add(db_version)
db.commit()
db.refresh(db_version)
return db_version
@app.get("/api_versions/{version_id}", response_model=APIVersionInDB)
async def get_api_version(version_id: int, db: Session = Depends(get_db)):
"""获取API版本详情"""
version = db.query(DBAPIVersion).filter(DBAPIVersion.id == version_id).first()
if not version:
raise HTTPException(status_code=404, detail="API版本不存在")
return version
@app.put("/api_versions/{version_id}", response_model=APIVersionInDB)
async def update_api_version(version_id: int, version: APIVersionUpdate, db: Session = Depends(get_db)):
"""更新API版本"""
db_version = db.query(DBAPIVersion).filter(DBAPIVersion.id == version_id).first()
if not db_version:
raise HTTPException(status_code=404, detail="API版本不存在")
# 如果更新了api_id检查API是否存在
if version.api_id and version.api_id != db_version.api_id:
api = db.query(DBAPI).filter(DBAPI.api_id == version.api_id).first()
if not api:
raise HTTPException(status_code=404, detail="指定的API不存在")
# 如果更新了version_num检查版本号是否重复排除自己
if version.version_num and version.version_num != db_version.version_num:
existing = db.query(DBAPIVersion).filter(
DBAPIVersion.api_id == db_version.api_id,
DBAPIVersion.version_num == version.version_num,
DBAPIVersion.id != version_id
).first()
if existing:
raise HTTPException(status_code=400, detail="该API下已存在相同版本号")
for field, value in version.model_dump().items():
if value is not None:
setattr(db_version, field, value)
db.commit()
db.refresh(db_version)
return db_version
@app.delete("/api_versions/{version_id}")
async def delete_api_version(version_id: int, db: Session = Depends(get_db)):
"""删除API版本"""
db_version = db.query(DBAPIVersion).filter(DBAPIVersion.id == version_id).first()
if not db_version:
raise HTTPException(status_code=404, detail="API版本不存在")
db.delete(db_version)
db.commit()
return {"message": "API版本删除成功"}
if __name__ == "__main__":
import uvicorn
# 使用127.0.0.1而不是0.0.0.0来避免权限问题
uvicorn.run(app, host="127.0.0.1", port=9002, log_level="info")