import redis import json from typing import Optional, Any from app.config.settings import settings class RedisCache: """Redis缓存类""" def __init__(self): """初始化Redis连接""" self.redis_client = redis.from_url( settings.REDIS_URL, decode_responses=True, socket_connect_timeout=5, socket_timeout=5, retry_on_timeout=True, health_check_interval=30, max_connections=50 ) def get(self, key: str) -> Optional[Any]: """获取缓存值""" try: value = self.redis_client.get(key) if value: return json.loads(value) return None except Exception as e: print(f"Redis get error: {e}") return None def set(self, key: str, value: Any, expire: Optional[int] = None) -> bool: """设置缓存值""" try: value_str = json.dumps(value) if expire: return self.redis_client.setex(key, expire, value_str) else: return self.redis_client.set(key, value_str) except Exception as e: print(f"Redis set error: {e}") return False def delete(self, key: str) -> bool: """删除缓存值""" try: return bool(self.redis_client.delete(key)) except Exception as e: print(f"Redis delete error: {e}") return False def exists(self, key: str) -> bool: """检查缓存是否存在""" try: return bool(self.redis_client.exists(key)) except Exception as e: print(f"Redis exists error: {e}") return False def increment(self, key: str, amount: int = 1) -> Optional[int]: """递增计数器""" try: return self.redis_client.incrby(key, amount) except Exception as e: print(f"Redis increment error: {e}") return None def decrement(self, key: str, amount: int = 1) -> Optional[int]: """递减计数器""" try: return self.redis_client.decrby(key, amount) except Exception as e: print(f"Redis decrement error: {e}") return None # 创建全局缓存实例 cache = RedisCache()