diff --git a/app/helper/redis.py b/app/helper/redis.py new file mode 100644 index 00000000..ca719093 --- /dev/null +++ b/app/helper/redis.py @@ -0,0 +1,231 @@ +import json +import pickle +from typing import Any, Optional +from urllib.parse import quote + +import redis + +from app.core.config import settings +from app.log import logger + + +class RedisHelper: + """ + Redis连接和操作助手类 + + 特性: + - 管理Redis连接池和客户端 + - 提供序列化和反序列化功能 + - 支持内存限制和淘汰策略设置 + - 提供键名生成和区域管理功能 + """ + + # 类型缓存集合,针对非容器简单类型 + _complex_serializable_types = set() + _simple_serializable_types = set() + + def __init__(self, redis_url: Optional[str] = "redis://localhost"): + """ + 初始化Redis助手实例 + + :param redis_url: Redis服务的URL + """ + self.redis_url = redis_url + self.client = None + self._connect() + + def _connect(self): + """ + 建立Redis连接 + """ + try: + self.client = redis.Redis.from_url( + self.redis_url, + decode_responses=False, + socket_timeout=30, + socket_connect_timeout=5, + health_check_interval=60, + ) + # 测试连接,确保Redis可用 + self.client.ping() + logger.debug(f"Successfully connected to Redis") + self.set_memory_limit() + except Exception as e: + logger.error(f"Failed to connect to Redis: {e}") + raise RuntimeError("Redis connection failed") from e + + def set_memory_limit(self, policy: Optional[str] = "allkeys-lru"): + """ + 动态设置Redis最大内存和内存淘汰策略 + + :param policy: 淘汰策略(如'allkeys-lru') + """ + try: + # 如果有显式值,则直接使用,为0时说明不限制,如果未配置,开启BIG_MEMORY_MODE时为"1024mb",未开启时为"256mb" + maxmemory = settings.CACHE_REDIS_MAXMEMORY or ("1024mb" if settings.BIG_MEMORY_MODE else "256mb") + self.client.config_set("maxmemory", maxmemory) + self.client.config_set("maxmemory-policy", policy) + logger.debug(f"Redis maxmemory set to {maxmemory}, policy: {policy}") + except Exception as e: + logger.error(f"Failed to set Redis maxmemory or policy: {e}") + + @staticmethod + def is_container_type(t): + """ + 判断是否为容器类型 + """ + return t in (list, dict, tuple, set) + + @classmethod + def serialize(cls, value: Any) -> bytes: + """ + 将值序列化为二进制数据,根据序列化方式标识格式 + """ + vt = type(value) + # 针对非容器类型使用缓存策略 + if not cls.is_container_type(vt): + # 如果已知需要复杂序列化 + if vt in cls._complex_serializable_types: + return b"PICKLE" + b"\x00" + pickle.dumps(value) + # 如果已知可以简单序列化 + if vt in cls._simple_serializable_types: + json_data = json.dumps(value).encode("utf-8") + return b"JSON" + b"\x00" + json_data + # 对于未知的非容器类型,尝试简单序列化,如抛出异常,再使用复杂序列化 + try: + json_data = json.dumps(value).encode("utf-8") + cls._simple_serializable_types.add(vt) + return b"JSON" + b"\x00" + json_data + except TypeError: + cls._complex_serializable_types.add(vt) + return b"PICKLE" + b"\x00" + pickle.dumps(value) + # 针对容器类型,每次尝试简单序列化,不使用缓存 + else: + try: + json_data = json.dumps(value).encode("utf-8") + return b"JSON" + b"\x00" + json_data + except TypeError: + return b"PICKLE" + b"\x00" + pickle.dumps(value) + + @classmethod + def deserialize(cls, value: bytes) -> Any: + """ + 将二进制数据反序列化为原始值,根据格式标识区分序列化方式 + """ + format_marker, data = value.split(b"\x00", 1) + if format_marker == b"JSON": + return json.loads(data.decode("utf-8")) + elif format_marker == b"PICKLE": + return pickle.loads(data) + else: + raise ValueError("Unknown serialization format") + + @staticmethod + def get_region(region: Optional[str] = "DEFAULT"): + """ + 获取缓存的区 + """ + return f"region:{region}" if region else "region:default" + + def get_redis_key(self, region: str, key: str) -> str: + """ + 获取缓存Key + """ + # 使用region作为缓存键的一部分 + region = self.get_region(quote(region)) + return f"{region}:key:{quote(key)}" + + def set(self, key: str, value: Any, ttl: Optional[int] = None, + region: Optional[str] = "DEFAULT", **kwargs) -> None: + """ + 设置缓存 + + :param key: 缓存的键 + :param value: 缓存的值 + :param ttl: 缓存的存活时间,单位秒 + :param region: 缓存的区 + :param kwargs: 其他参数 + """ + try: + redis_key = self.get_redis_key(region, key) + # 对值进行序列化 + serialized_value = self.serialize(value) + kwargs.pop("maxsize", None) + self.client.set(redis_key, serialized_value, ex=ttl, **kwargs) + except Exception as e: + logger.error(f"Failed to set key: {key} in region: {region}, error: {e}") + + def exists(self, key: str, region: Optional[str] = "DEFAULT") -> bool: + """ + 判断缓存键是否存在 + + :param key: 缓存的键 + :param region: 缓存的区 + :return: 存在返回True,否则返回False + """ + try: + redis_key = self.get_redis_key(region, key) + return self.client.exists(redis_key) == 1 + except Exception as e: + logger.error(f"Failed to exists key: {key} region: {region}, error: {e}") + return False + + def get(self, key: str, region: Optional[str] = "DEFAULT") -> Optional[Any]: + """ + 获取缓存的值 + + :param key: 缓存的键 + :param region: 缓存的区 + :return: 返回缓存的值,如果缓存不存在返回None + """ + try: + redis_key = self.get_redis_key(region, key) + value = self.client.get(redis_key) + if value is not None: + return self.deserialize(value) + return None + except Exception as e: + logger.error(f"Failed to get key: {key} in region: {region}, error: {e}") + return None + + def delete(self, key: str, region: Optional[str] = "DEFAULT") -> None: + """ + 删除缓存 + + :param key: 缓存的键 + :param region: 缓存的区 + """ + try: + redis_key = self.get_redis_key(region, key) + self.client.delete(redis_key) + except Exception as e: + logger.error(f"Failed to delete key: {key} in region: {region}, error: {e}") + + def clear(self, region: Optional[str] = None) -> None: + """ + 清除指定区域的缓存或全部缓存 + + :param region: 缓存的区 + """ + try: + if region: + cache_region = self.get_region(quote(region)) + redis_key = f"{cache_region}:key:*" + with self.client.pipeline() as pipe: + for key in self.client.scan_iter(redis_key): + pipe.delete(key) + pipe.execute() + logger.info(f"Cleared Redis cache for region: {region}") + else: + self.client.flushdb() + logger.info("Cleared all Redis cache") + except Exception as e: + logger.error(f"Failed to clear cache, region: {region}, error: {e}") + + def close(self) -> None: + """ + 关闭Redis客户端的连接池 + """ + if self.client: + self.client.close() + logger.debug("Redis connection closed")