baoxiang/backend/app/services/countdown_service.py
2025-12-16 18:06:50 +08:00

201 lines
5.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
倒计时服务 - 独立管理宝箱倒计时
"""
import asyncio
from typing import Dict, Optional, Callable
from datetime import datetime
from ..core.database import SessionLocal
from ..models.game import Chest
from ..utils.redis import get_pool_cache
class CountdownManager:
"""倒计时管理器 - 单例模式"""
_instance: Optional['CountdownManager'] = None
def __new__(cls) -> 'CountdownManager':
"""单例模式实现"""
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
"""初始化倒计时管理器"""
if self._initialized:
return
self._initialized = True
self._chests: Dict[int, 'ChestCountdown'] = {} # 宝箱ID -> 倒计时实例
self._tasks: Dict[int, asyncio.Task] = {} # 宝箱ID -> 任务
self._lock = asyncio.Lock() # 线程锁
async def start_chest_countdown(
self,
chest: Chest,
on_update: Callable[[int, int], None],
on_expire: Callable[[int], None] = None
):
"""
为宝箱启动倒计时
Args:
chest: 宝箱对象
on_update: 倒计时更新回调函数 (chest_id, time_remaining)
on_expire: 倒计时结束回调函数 (chest_id)
"""
chest_id = chest.id
# 如果已存在,先停止
if chest_id in self._tasks:
await self.stop_chest_countdown(chest_id)
# 创建倒计时实例
countdown = ChestCountdown(chest, on_update, on_expire)
async with self._lock:
self._chests[chest_id] = countdown
# 启动倒计时任务
task = asyncio.create_task(self._run_countdown(chest_id))
async with self._lock:
self._tasks[chest_id] = task
async def stop_chest_countdown(self, chest_id: int):
"""停止宝箱倒计时"""
async with self._lock:
# 取消任务
if chest_id in self._tasks:
self._tasks[chest_id].cancel()
try:
await self._tasks[chest_id]
except asyncio.CancelledError:
pass
del self._tasks[chest_id]
# 移除倒计时实例
if chest_id in self._chests:
del self._chests[chest_id]
async def get_remaining_time(self, chest_id: int) -> int:
"""
获取宝箱剩余时间(秒)
Args:
chest_id: 宝箱ID
Returns:
剩余秒数0表示已结束
"""
async with self._lock:
countdown = self._chests.get(chest_id)
if countdown:
return countdown.get_remaining_time()
return 0
async def _run_countdown(self, chest_id: int):
"""
运行倒计时任务
Args:
chest_id: 宝箱ID
"""
try:
while True:
async with self._lock:
countdown = self._chests.get(chest_id)
if not countdown:
break
# 计算剩余时间
remaining = countdown.get_remaining_time()
# 检查是否结束
if remaining <= 0:
# 调用过期回调
if countdown.on_expire:
countdown.on_expire(chest_id)
break
# 调用更新回调
if countdown.on_update:
countdown.on_update(chest_id, remaining)
# 每秒更新一次
await asyncio.sleep(1)
except asyncio.CancelledError:
# 任务被取消,正常退出
pass
except Exception as e:
print(f"Countdown task error for chest {chest_id}: {e}")
finally:
# 清理资源
await self.stop_chest_countdown(chest_id)
class ChestCountdown:
"""单个宝箱的倒计时实例"""
def __init__(
self,
chest: Chest,
on_update: Callable[[int, int], None],
on_expire: Optional[Callable[[int], None]] = None
):
"""
初始化宝箱倒计时
Args:
chest: 宝箱对象
on_update: 更新回调函数
on_expire: 过期回调函数
"""
self.chest = chest
self.on_update = on_update
self.on_expire = on_expire
self._start_time = datetime.now()
def get_remaining_time(self) -> int:
"""
计算剩余时间
Returns:
剩余秒数
"""
# 如果宝箱状态不是投注中返回0
if self.chest.status != 0: # BETTING
return 0
# 获取宝箱创建时间
created_time = self.chest.created_at
if isinstance(created_time, str):
# 处理ISO格式字符串
if 'T' in created_time:
created_time = datetime.fromisoformat(
created_time.replace('Z', '')
)
else:
# 处理MySQL datetime格式 (YYYY-MM-DD HH:MM:SS)
created_time = datetime.strptime(
created_time,
'%Y-%m-%d %H:%M:%S'
)
else:
# 确保转换为naive datetime
created_time = created_time.replace(
tzinfo=None
) if created_time and created_time.tzinfo else created_time
# 使用本地时间计算与数据库func.now()保持一致)
now = datetime.now()
elapsed = (now - created_time).total_seconds()
remaining = max(0, int(self.chest.countdown_seconds - elapsed))
return remaining
# 全局倒计时管理器实例
countdown_manager = CountdownManager()