fix redis

This commit is contained in:
jxxghp
2025-08-21 08:14:21 +08:00
parent f05a23a490
commit c06a4b759c

View File

@@ -13,6 +13,61 @@ from app.schemas import ConfigChangeEventData
from app.schemas.types import EventType
from app.utils.singleton import Singleton
# 类型缓存集合,针对非容器简单类型
_complex_serializable_types = set()
_simple_serializable_types = set()
def serialize(value: Any) -> bytes:
"""
将值序列化为二进制数据,根据序列化方式标识格式
"""
def _is_container_type(t):
"""
判断是否为容器类型
"""
return t in (list, dict, tuple, set)
vt = type(value)
# 针对非容器类型使用缓存策略
if not _is_container_type(vt):
# 如果已知需要复杂序列化
if vt in _complex_serializable_types:
return b"PICKLE" + b"\x00" + pickle.dumps(value)
# 如果已知可以简单序列化
if vt in _simple_serializable_types:
json_data = json.dumps(value).encode("utf-8")
return b"JSON" + b"\x00" + json_data
# 对于未知的非容器类型,尝试简单序列化,如抛出异常,再使用复杂序列化
try:
json_data = json.dumps(value).encode("utf-8")
_simple_serializable_types.add(vt)
return b"JSON" + b"\x00" + json_data
except TypeError:
_complex_serializable_types.add(vt)
return b"PICKLE" + b"\x00" + pickle.dumps(value)
else:
# 针对容器类型,每次尝试简单序列化,不使用缓存
try:
json_data = json.dumps(value).encode("utf-8")
return b"JSON" + b"\x00" + json_data
except TypeError:
return b"PICKLE" + b"\x00" + pickle.dumps(value)
def deserialize(value: bytes) -> Any:
"""
将二进制数据反序列化为原始值,根据格式标识区分序列化方式
"""
format_marker, data = value.split(b"\x00", 1)
if format_marker == b"JSON":
return json.loads(data.decode("utf-8"))
elif format_marker == b"PICKLE":
return pickle.loads(data)
else:
raise ValueError("Unknown serialization format")
class RedisHelper(metaclass=Singleton):
"""
@@ -25,10 +80,6 @@ class RedisHelper(metaclass=Singleton):
- 提供键名生成和区域管理功能
"""
# 类型缓存集合,针对非容器简单类型
_complex_serializable_types = set()
_simple_serializable_types = set()
def __init__(self):
"""
初始化Redis助手实例
@@ -88,57 +139,6 @@ class RedisHelper(metaclass=Singleton):
except Exception as e:
logger.error(f"Failed to set Redis maxmemory or policy: {e}")
@staticmethod
def is_container_type(t):
"""
判断是否为容器类型
"""
return t in (list, dict, tuple, set)
@classmethod
def serialize(cls, value: Any) -> bytes:
"""
将值序列化为二进制数据,根据序列化方式标识格式
"""
vt = type(value)
# 针对非容器类型使用缓存策略
if not cls.is_container_type(vt):
# 如果已知需要复杂序列化
if vt in cls._complex_serializable_types:
return b"PICKLE" + b"\x00" + pickle.dumps(value)
# 如果已知可以简单序列化
if vt in cls._simple_serializable_types:
json_data = json.dumps(value).encode("utf-8")
return b"JSON" + b"\x00" + json_data
# 对于未知的非容器类型,尝试简单序列化,如抛出异常,再使用复杂序列化
try:
json_data = json.dumps(value).encode("utf-8")
cls._simple_serializable_types.add(vt)
return b"JSON" + b"\x00" + json_data
except TypeError:
cls._complex_serializable_types.add(vt)
return b"PICKLE" + b"\x00" + pickle.dumps(value)
# 针对容器类型,每次尝试简单序列化,不使用缓存
else:
try:
json_data = json.dumps(value).encode("utf-8")
return b"JSON" + b"\x00" + json_data
except TypeError:
return b"PICKLE" + b"\x00" + pickle.dumps(value)
@classmethod
def deserialize(cls, value: bytes) -> Any:
"""
将二进制数据反序列化为原始值,根据格式标识区分序列化方式
"""
format_marker, data = value.split(b"\x00", 1)
if format_marker == b"JSON":
return json.loads(data.decode("utf-8"))
elif format_marker == b"PICKLE":
return pickle.loads(data)
else:
raise ValueError("Unknown serialization format")
@staticmethod
def get_region(region: Optional[str] = "DEFAULT"):
"""
@@ -169,7 +169,7 @@ class RedisHelper(metaclass=Singleton):
self._connect()
redis_key = self.get_redis_key(region, key)
# 对值进行序列化
serialized_value = self.serialize(value)
serialized_value = serialize(value)
kwargs.pop("maxsize", None)
self.client.set(redis_key, serialized_value, ex=ttl, **kwargs)
except Exception as e:
@@ -204,7 +204,7 @@ class RedisHelper(metaclass=Singleton):
redis_key = self.get_redis_key(region, key)
value = self.client.get(redis_key)
if value is not None:
return self.deserialize(value)
return deserialize(value)
return None
except Exception as e:
logger.error(f"Failed to get key: {key} in region: {region}, error: {e}")
@@ -261,12 +261,12 @@ class RedisHelper(metaclass=Singleton):
for key in self.client.scan_iter(redis_key):
value = self.client.get(key)
if value is not None:
yield key, self.deserialize(value)
yield key, deserialize(value)
else:
for key in self.client.scan_iter("*"):
value = self.client.get(key)
if value is not None:
yield key, self.deserialize(value)
yield key, deserialize(value)
except Exception as e:
logger.error(f"Failed to get items from Redis, region: {region}, error: {e}")
@@ -368,57 +368,6 @@ class AsyncRedisHelper(metaclass=Singleton):
except Exception as e:
logger.error(f"Failed to set Redis maxmemory or policy (async): {e}")
@staticmethod
def is_container_type(t):
"""
判断是否为容器类型
"""
return t in (list, dict, tuple, set)
@classmethod
def serialize(cls, value: Any) -> bytes:
"""
将值序列化为二进制数据,根据序列化方式标识格式
"""
vt = type(value)
# 针对非容器类型使用缓存策略
if not cls.is_container_type(vt):
# 如果已知需要复杂序列化
if vt in cls._complex_serializable_types:
return b"PICKLE" + b"\x00" + pickle.dumps(value)
# 如果已知可以简单序列化
if vt in cls._simple_serializable_types:
json_data = json.dumps(value).encode("utf-8")
return b"JSON" + b"\x00" + json_data
# 对于未知的非容器类型,尝试简单序列化,如抛出异常,再使用复杂序列化
try:
json_data = json.dumps(value).encode("utf-8")
cls._simple_serializable_types.add(vt)
return b"JSON" + b"\x00" + json_data
except TypeError:
cls._complex_serializable_types.add(vt)
return b"PICKLE" + b"\x00" + pickle.dumps(value)
# 针对容器类型,每次尝试简单序列化,不使用缓存
else:
try:
json_data = json.dumps(value).encode("utf-8")
return b"JSON" + b"\x00" + json_data
except TypeError:
return b"PICKLE" + b"\x00" + pickle.dumps(value)
@classmethod
def deserialize(cls, value: bytes) -> Any:
"""
将二进制数据反序列化为原始值,根据格式标识区分序列化方式
"""
format_marker, data = value.split(b"\x00", 1)
if format_marker == b"JSON":
return json.loads(data.decode("utf-8"))
elif format_marker == b"PICKLE":
return pickle.loads(data)
else:
raise ValueError("Unknown serialization format")
@staticmethod
def get_region(region: Optional[str] = "DEFAULT"):
"""
@@ -449,7 +398,7 @@ class AsyncRedisHelper(metaclass=Singleton):
await self._connect()
redis_key = self.get_redis_key(region, key)
# 对值进行序列化
serialized_value = self.serialize(value)
serialized_value = serialize(value)
kwargs.pop("maxsize", None)
await self.client.set(redis_key, serialized_value, ex=ttl, **kwargs)
except Exception as e:
@@ -485,7 +434,7 @@ class AsyncRedisHelper(metaclass=Singleton):
redis_key = self.get_redis_key(region, key)
value = await self.client.get(redis_key)
if value is not None:
return self.deserialize(value)
return deserialize(value)
return None
except Exception as e:
logger.error(f"Failed to get key (async): {key} in region: {region}, error: {e}")
@@ -542,12 +491,12 @@ class AsyncRedisHelper(metaclass=Singleton):
async for key in self.client.scan_iter(redis_key):
value = await self.client.get(key)
if value is not None:
yield key, self.deserialize(value)
yield key, deserialize(value)
else:
async for key in self.client.scan_iter("*"):
value = await self.client.get(key)
if value is not None:
yield key, self.deserialize(value)
yield key, deserialize(value)
except Exception as e:
logger.error(f"Failed to get items from Redis, region: {region}, error: {e}")