baoxiang/backend/app/utils/redis.py

89 lines
2.2 KiB
Python
Raw Normal View History

2025-12-16 18:06:50 +08:00
"""
Redis工具
"""
import redis
from ..core.config import settings
# 创建Redis连接
redis_settings = settings.redis
redis_client = redis.Redis(
host=redis_settings.REDIS_HOST,
port=redis_settings.REDIS_PORT,
db=redis_settings.REDIS_DB,
password=redis_settings.REDIS_PASSWORD,
decode_responses=True
)
def get_redis_client():
"""
获取Redis客户端
"""
return redis_client
def acquire_lock(lock_key: str, lock_value: str, expire_seconds: int = 10) -> bool:
"""
获取分布式锁
"""
try:
result = redis_client.set(lock_key, lock_value, nx=True, ex=expire_seconds)
return result is True
except Exception as e:
print(f"获取锁失败: {e}")
return False
def release_lock(lock_key: str, lock_value: str) -> bool:
"""
释放分布式锁
"""
try:
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = redis_client.eval(script, 1, lock_key, lock_value)
return result == 1
except Exception as e:
print(f"释放锁失败: {e}")
return False
def update_pool_cache(chest_id: int, pool_a: int, pool_b: int) -> None:
"""
更新奖池缓存
"""
try:
key = f"chest_pool:{chest_id}"
redis_client.hset(key, mapping={
"pool_a": pool_a,
"pool_b": pool_b,
"total": pool_a + pool_b,
"updated_at": str(int(__import__('time').time()))
})
except Exception as e:
print(f"更新奖池缓存失败: {e}")
def get_pool_cache(chest_id: int) -> dict:
"""
获取奖池缓存
"""
try:
key = f"chest_pool:{chest_id}"
data = redis_client.hgetall(key)
if data:
return {
"pool_a": int(data.get("pool_a", 0)),
"pool_b": int(data.get("pool_b", 0)),
"total": int(data.get("total", 0))
}
return {"pool_a": 0, "pool_b": 0, "total": 0}
except Exception as e:
print(f"获取奖池缓存失败: {e}")
return {"pool_a": 0, "pool_b": 0, "total": 0}