baoxiang/backend/app/services/countdown_service.py

201 lines
5.9 KiB
Python
Raw Normal View History

2025-12-16 18:06:50 +08:00
"""
倒计时服务 - 独立管理宝箱倒计时
"""
import asyncio
from typing import Dict, Optional, Callable
from datetime import datetime
from ..core.database import SessionLocal
from ..models.game import Chest
from ..utils.redis import get_pool_cache
class CountdownManager:
"""倒计时管理器 - 单例模式"""
_instance: Optional['CountdownManager'] = None
def __new__(cls) -> 'CountdownManager':
"""单例模式实现"""
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
"""初始化倒计时管理器"""
if self._initialized:
return
self._initialized = True
self._chests: Dict[int, 'ChestCountdown'] = {} # 宝箱ID -> 倒计时实例
self._tasks: Dict[int, asyncio.Task] = {} # 宝箱ID -> 任务
self._lock = asyncio.Lock() # 线程锁
async def start_chest_countdown(
self,
chest: Chest,
on_update: Callable[[int, int], None],
on_expire: Callable[[int], None] = None
):
"""
为宝箱启动倒计时
Args:
chest: 宝箱对象
on_update: 倒计时更新回调函数 (chest_id, time_remaining)
on_expire: 倒计时结束回调函数 (chest_id)
"""
chest_id = chest.id
# 如果已存在,先停止
if chest_id in self._tasks:
await self.stop_chest_countdown(chest_id)
# 创建倒计时实例
countdown = ChestCountdown(chest, on_update, on_expire)
async with self._lock:
self._chests[chest_id] = countdown
# 启动倒计时任务
task = asyncio.create_task(self._run_countdown(chest_id))
async with self._lock:
self._tasks[chest_id] = task
async def stop_chest_countdown(self, chest_id: int):
"""停止宝箱倒计时"""
async with self._lock:
# 取消任务
if chest_id in self._tasks:
self._tasks[chest_id].cancel()
try:
await self._tasks[chest_id]
except asyncio.CancelledError:
pass
del self._tasks[chest_id]
# 移除倒计时实例
if chest_id in self._chests:
del self._chests[chest_id]
async def get_remaining_time(self, chest_id: int) -> int:
"""
获取宝箱剩余时间
Args:
chest_id: 宝箱ID
Returns:
剩余秒数0表示已结束
"""
async with self._lock:
countdown = self._chests.get(chest_id)
if countdown:
return countdown.get_remaining_time()
return 0
async def _run_countdown(self, chest_id: int):
"""
运行倒计时任务
Args:
chest_id: 宝箱ID
"""
try:
while True:
async with self._lock:
countdown = self._chests.get(chest_id)
if not countdown:
break
# 计算剩余时间
remaining = countdown.get_remaining_time()
# 检查是否结束
if remaining <= 0:
# 调用过期回调
if countdown.on_expire:
countdown.on_expire(chest_id)
break
# 调用更新回调
if countdown.on_update:
countdown.on_update(chest_id, remaining)
# 每秒更新一次
await asyncio.sleep(1)
except asyncio.CancelledError:
# 任务被取消,正常退出
pass
except Exception as e:
print(f"Countdown task error for chest {chest_id}: {e}")
finally:
# 清理资源
await self.stop_chest_countdown(chest_id)
class ChestCountdown:
"""单个宝箱的倒计时实例"""
def __init__(
self,
chest: Chest,
on_update: Callable[[int, int], None],
on_expire: Optional[Callable[[int], None]] = None
):
"""
初始化宝箱倒计时
Args:
chest: 宝箱对象
on_update: 更新回调函数
on_expire: 过期回调函数
"""
self.chest = chest
self.on_update = on_update
self.on_expire = on_expire
self._start_time = datetime.now()
def get_remaining_time(self) -> int:
"""
计算剩余时间
Returns:
剩余秒数
"""
# 如果宝箱状态不是投注中返回0
if self.chest.status != 0: # BETTING
return 0
# 获取宝箱创建时间
created_time = self.chest.created_at
if isinstance(created_time, str):
# 处理ISO格式字符串
if 'T' in created_time:
created_time = datetime.fromisoformat(
created_time.replace('Z', '')
)
else:
# 处理MySQL datetime格式 (YYYY-MM-DD HH:MM:SS)
created_time = datetime.strptime(
created_time,
'%Y-%m-%d %H:%M:%S'
)
else:
# 确保转换为naive datetime
created_time = created_time.replace(
tzinfo=None
) if created_time and created_time.tzinfo else created_time
# 使用本地时间计算与数据库func.now()保持一致)
now = datetime.now()
elapsed = (now - created_time).total_seconds()
remaining = max(0, int(self.chest.countdown_seconds - elapsed))
return remaining
# 全局倒计时管理器实例
countdown_manager = CountdownManager()