From c06a4b759c4d1248077d1ab710c5a2e3f19df0a7 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Thu, 21 Aug 2025 08:14:21 +0800 Subject: [PATCH] fix redis --- app/helper/redis.py | 177 ++++++++++++++++---------------------------- 1 file changed, 63 insertions(+), 114 deletions(-) diff --git a/app/helper/redis.py b/app/helper/redis.py index ad06cef9..a0c05350 100644 --- a/app/helper/redis.py +++ b/app/helper/redis.py @@ -13,6 +13,61 @@ from app.schemas import ConfigChangeEventData from app.schemas.types import EventType from app.utils.singleton import Singleton +# 类型缓存集合,针对非容器简单类型 +_complex_serializable_types = set() +_simple_serializable_types = set() + + +def serialize(value: Any) -> bytes: + """ + 将值序列化为二进制数据,根据序列化方式标识格式 + """ + + def _is_container_type(t): + """ + 判断是否为容器类型 + """ + return t in (list, dict, tuple, set) + + vt = type(value) + # 针对非容器类型使用缓存策略 + if not _is_container_type(vt): + # 如果已知需要复杂序列化 + if vt in _complex_serializable_types: + return b"PICKLE" + b"\x00" + pickle.dumps(value) + # 如果已知可以简单序列化 + if vt in _simple_serializable_types: + json_data = json.dumps(value).encode("utf-8") + return b"JSON" + b"\x00" + json_data + # 对于未知的非容器类型,尝试简单序列化,如抛出异常,再使用复杂序列化 + try: + json_data = json.dumps(value).encode("utf-8") + _simple_serializable_types.add(vt) + return b"JSON" + b"\x00" + json_data + except TypeError: + _complex_serializable_types.add(vt) + return b"PICKLE" + b"\x00" + pickle.dumps(value) + else: + # 针对容器类型,每次尝试简单序列化,不使用缓存 + try: + json_data = json.dumps(value).encode("utf-8") + return b"JSON" + b"\x00" + json_data + except TypeError: + return b"PICKLE" + b"\x00" + pickle.dumps(value) + + +def deserialize(value: bytes) -> Any: + """ + 将二进制数据反序列化为原始值,根据格式标识区分序列化方式 + """ + format_marker, data = value.split(b"\x00", 1) + if format_marker == b"JSON": + return json.loads(data.decode("utf-8")) + elif format_marker == b"PICKLE": + return pickle.loads(data) + else: + raise ValueError("Unknown serialization format") + class RedisHelper(metaclass=Singleton): """ @@ -25,10 +80,6 @@ class RedisHelper(metaclass=Singleton): - 提供键名生成和区域管理功能 """ - # 类型缓存集合,针对非容器简单类型 - _complex_serializable_types = set() - _simple_serializable_types = set() - def __init__(self): """ 初始化Redis助手实例 @@ -88,57 +139,6 @@ class RedisHelper(metaclass=Singleton): except Exception as e: logger.error(f"Failed to set Redis maxmemory or policy: {e}") - @staticmethod - def is_container_type(t): - """ - 判断是否为容器类型 - """ - return t in (list, dict, tuple, set) - - @classmethod - def serialize(cls, value: Any) -> bytes: - """ - 将值序列化为二进制数据,根据序列化方式标识格式 - """ - vt = type(value) - # 针对非容器类型使用缓存策略 - if not cls.is_container_type(vt): - # 如果已知需要复杂序列化 - if vt in cls._complex_serializable_types: - return b"PICKLE" + b"\x00" + pickle.dumps(value) - # 如果已知可以简单序列化 - if vt in cls._simple_serializable_types: - json_data = json.dumps(value).encode("utf-8") - return b"JSON" + b"\x00" + json_data - # 对于未知的非容器类型,尝试简单序列化,如抛出异常,再使用复杂序列化 - try: - json_data = json.dumps(value).encode("utf-8") - cls._simple_serializable_types.add(vt) - return b"JSON" + b"\x00" + json_data - except TypeError: - cls._complex_serializable_types.add(vt) - return b"PICKLE" + b"\x00" + pickle.dumps(value) - # 针对容器类型,每次尝试简单序列化,不使用缓存 - else: - try: - json_data = json.dumps(value).encode("utf-8") - return b"JSON" + b"\x00" + json_data - except TypeError: - return b"PICKLE" + b"\x00" + pickle.dumps(value) - - @classmethod - def deserialize(cls, value: bytes) -> Any: - """ - 将二进制数据反序列化为原始值,根据格式标识区分序列化方式 - """ - format_marker, data = value.split(b"\x00", 1) - if format_marker == b"JSON": - return json.loads(data.decode("utf-8")) - elif format_marker == b"PICKLE": - return pickle.loads(data) - else: - raise ValueError("Unknown serialization format") - @staticmethod def get_region(region: Optional[str] = "DEFAULT"): """ @@ -169,7 +169,7 @@ class RedisHelper(metaclass=Singleton): self._connect() redis_key = self.get_redis_key(region, key) # 对值进行序列化 - serialized_value = self.serialize(value) + serialized_value = serialize(value) kwargs.pop("maxsize", None) self.client.set(redis_key, serialized_value, ex=ttl, **kwargs) except Exception as e: @@ -204,7 +204,7 @@ class RedisHelper(metaclass=Singleton): redis_key = self.get_redis_key(region, key) value = self.client.get(redis_key) if value is not None: - return self.deserialize(value) + return deserialize(value) return None except Exception as e: logger.error(f"Failed to get key: {key} in region: {region}, error: {e}") @@ -261,12 +261,12 @@ class RedisHelper(metaclass=Singleton): for key in self.client.scan_iter(redis_key): value = self.client.get(key) if value is not None: - yield key, self.deserialize(value) + yield key, deserialize(value) else: for key in self.client.scan_iter("*"): value = self.client.get(key) if value is not None: - yield key, self.deserialize(value) + yield key, deserialize(value) except Exception as e: logger.error(f"Failed to get items from Redis, region: {region}, error: {e}") @@ -368,57 +368,6 @@ class AsyncRedisHelper(metaclass=Singleton): except Exception as e: logger.error(f"Failed to set Redis maxmemory or policy (async): {e}") - @staticmethod - def is_container_type(t): - """ - 判断是否为容器类型 - """ - return t in (list, dict, tuple, set) - - @classmethod - def serialize(cls, value: Any) -> bytes: - """ - 将值序列化为二进制数据,根据序列化方式标识格式 - """ - vt = type(value) - # 针对非容器类型使用缓存策略 - if not cls.is_container_type(vt): - # 如果已知需要复杂序列化 - if vt in cls._complex_serializable_types: - return b"PICKLE" + b"\x00" + pickle.dumps(value) - # 如果已知可以简单序列化 - if vt in cls._simple_serializable_types: - json_data = json.dumps(value).encode("utf-8") - return b"JSON" + b"\x00" + json_data - # 对于未知的非容器类型,尝试简单序列化,如抛出异常,再使用复杂序列化 - try: - json_data = json.dumps(value).encode("utf-8") - cls._simple_serializable_types.add(vt) - return b"JSON" + b"\x00" + json_data - except TypeError: - cls._complex_serializable_types.add(vt) - return b"PICKLE" + b"\x00" + pickle.dumps(value) - # 针对容器类型,每次尝试简单序列化,不使用缓存 - else: - try: - json_data = json.dumps(value).encode("utf-8") - return b"JSON" + b"\x00" + json_data - except TypeError: - return b"PICKLE" + b"\x00" + pickle.dumps(value) - - @classmethod - def deserialize(cls, value: bytes) -> Any: - """ - 将二进制数据反序列化为原始值,根据格式标识区分序列化方式 - """ - format_marker, data = value.split(b"\x00", 1) - if format_marker == b"JSON": - return json.loads(data.decode("utf-8")) - elif format_marker == b"PICKLE": - return pickle.loads(data) - else: - raise ValueError("Unknown serialization format") - @staticmethod def get_region(region: Optional[str] = "DEFAULT"): """ @@ -449,7 +398,7 @@ class AsyncRedisHelper(metaclass=Singleton): await self._connect() redis_key = self.get_redis_key(region, key) # 对值进行序列化 - serialized_value = self.serialize(value) + serialized_value = serialize(value) kwargs.pop("maxsize", None) await self.client.set(redis_key, serialized_value, ex=ttl, **kwargs) except Exception as e: @@ -485,7 +434,7 @@ class AsyncRedisHelper(metaclass=Singleton): redis_key = self.get_redis_key(region, key) value = await self.client.get(redis_key) if value is not None: - return self.deserialize(value) + return deserialize(value) return None except Exception as e: logger.error(f"Failed to get key (async): {key} in region: {region}, error: {e}") @@ -542,12 +491,12 @@ class AsyncRedisHelper(metaclass=Singleton): async for key in self.client.scan_iter(redis_key): value = await self.client.get(key) if value is not None: - yield key, self.deserialize(value) + yield key, deserialize(value) else: async for key in self.client.scan_iter("*"): value = await self.client.get(key) if value is not None: - yield key, self.deserialize(value) + yield key, deserialize(value) except Exception as e: logger.error(f"Failed to get items from Redis, region: {region}, error: {e}")