diff --git a/app/chain/__init__.py b/app/chain/__init__.py index 0d88437c..a44814e6 100644 --- a/app/chain/__init__.py +++ b/app/chain/__init__.py @@ -7,10 +7,9 @@ from collections.abc import Callable from pathlib import Path from typing import Optional, Any, Tuple, List, Set, Union, Dict -from fastapi.concurrency import run_in_threadpool - import aiofiles from anyio import Path as AsyncPath +from fastapi.concurrency import run_in_threadpool from qbittorrentapi import TorrentFilesList from transmission_rpc import File @@ -23,6 +22,7 @@ from app.core.plugin import PluginManager from app.db.message_oper import MessageOper from app.db.user_oper import UserOper from app.helper.message import MessageHelper, MessageQueueManager, MessageTemplateHelper +from app.helper.redis import RedisHelper from app.helper.service import ServiceConfigHelper from app.log import logger from app.schemas import TransferInfo, TransferTorrent, ExistMediaInfo, DownloadingTorrent, CommingMessage, Notification, \ @@ -48,12 +48,29 @@ class ChainBase(metaclass=ABCMeta): send_callback=self.run_module ) self.pluginmanager = PluginManager() + # 初始化Redis缓存助手 + self._redis_helper = None + if settings.CACHE_BACKEND_TYPE == "redis": + try: + self._redis_helper = RedisHelper(redis_url=settings.CACHE_BACKEND_URL) + except RuntimeError as e: + logger.warning(f"Redis缓存初始化失败,将使用本地缓存: {e}") - @staticmethod - def load_cache(filename: str) -> Any: + def load_cache(self, filename: str) -> Any: """ - 从本地加载缓存 + 加载缓存,优先从Redis读取,没有数据时从本地读取(兼容存量未迁移数据) """ + # 如果Redis可用,优先从Redis读取 + if self._redis_helper: + try: + cache_data = self._redis_helper.get(filename, region="chain_cache") + if cache_data is not None: + logger.debug(f"从Redis加载缓存: {filename}") + return cache_data + except Exception as e: + logger.warning(f"从Redis加载缓存 {filename} 失败: {e}") + + # 从本地文件读取(兼容存量数据) cache_path = settings.TEMP_PATH / filename if cache_path.exists(): try: @@ -63,11 +80,21 @@ class ChainBase(metaclass=ABCMeta): logger.error(f"加载缓存 {filename} 出错:{str(err)}") return None - @staticmethod - async def async_load_cache(filename: str) -> Any: + async def async_load_cache(self, filename: str) -> Any: """ - 异步从本地加载缓存 + 异步加载缓存,优先从Redis读取,没有数据时从本地读取(兼容存量未迁移数据) """ + # 如果Redis可用,优先从Redis读取 + if self._redis_helper: + try: + cache_data = self._redis_helper.get(filename, region="chain_cache") + if cache_data is not None: + logger.debug(f"从Redis异步加载缓存: {filename}") + return cache_data + except Exception as e: + logger.warning(f"从Redis异步加载缓存 {filename} 失败: {e}") + + # 从本地文件读取(兼容存量数据) cache_path = settings.TEMP_PATH / filename if cache_path.exists(): try: @@ -75,51 +102,88 @@ class ChainBase(metaclass=ABCMeta): content = await f.read() return pickle.loads(content) except Exception as err: - logger.error(f"加载缓存 {filename} 出错:{str(err)}") + logger.error(f"异步加载缓存 {filename} 出错:{str(err)}") return None - @staticmethod - async def async_save_cache(cache: Any, filename: str) -> None: + async def async_save_cache(self, cache: Any, filename: str) -> None: """ - 异步保存缓存到本地 + 异步保存缓存,优先保存到Redis,同时保存到本地作为备份 """ - try: - async with aiofiles.open(settings.TEMP_PATH / filename, 'wb') as f: - await f.write(pickle.dumps(cache)) - except Exception as err: - logger.error(f"保存缓存 {filename} 出错:{str(err)}") + # 如果Redis可用,优先保存到Redis + if self._redis_helper: + try: + self._redis_helper.set(filename, cache, ttl=86400, region="chain_cache") + logger.debug(f"异步保存缓存到Redis: {filename}") + except Exception as e: + logger.warning(f"异步保存缓存到Redis失败: {e}") + else: + # 保存到本地 + try: + async with aiofiles.open(settings.TEMP_PATH / filename, 'wb') as f: + await f.write(pickle.dumps(cache)) + except Exception as err: + logger.error(f"异步保存缓存到本地 {filename} 出错:{str(err)}") - @staticmethod - def save_cache(cache: Any, filename: str) -> None: + def save_cache(self, cache: Any, filename: str) -> None: """ - 保存缓存到本地 + 保存缓存,优先保存到Redis,同时保存到本地作为备份 """ - try: - with open(settings.TEMP_PATH / filename, 'wb') as f: - pickle.dump(cache, f) # noqa - except Exception as err: - logger.error(f"保存缓存 {filename} 出错:{str(err)}") + # 如果Redis可用,优先保存到Redis + if self._redis_helper: + try: + self._redis_helper.set(filename, cache, ttl=86400, region="chain_cache") + logger.debug(f"保存缓存到Redis: {filename}") + except Exception as e: + logger.warning(f"保存缓存到Redis失败: {e}") + else: + # 保存到本地 + try: + with open(settings.TEMP_PATH / filename, 'wb') as f: + pickle.dump(cache, f) # noqa + except Exception as err: + logger.error(f"保存缓存到本地 {filename} 出错:{str(err)}") - @staticmethod - def remove_cache(filename: str) -> None: + def remove_cache(self, filename: str) -> None: """ - 删除本地缓存 + 删除缓存,同时删除Redis和本地缓存 """ + # 如果Redis可用,删除Redis缓存 + if self._redis_helper: + try: + self._redis_helper.delete(filename, region="chain_cache") + logger.debug(f"删除Redis缓存: {filename}") + except Exception as e: + logger.warning(f"删除Redis缓存失败: {e}") + + # 删除本地缓存 cache_path = settings.TEMP_PATH / filename if cache_path.exists(): - cache_path.unlink() + try: + cache_path.unlink() + logger.debug(f"删除本地缓存: {filename}") + except Exception as e: + logger.warning(f"删除本地缓存失败: {e}") - @staticmethod - async def async_remove_cache(filename: str) -> None: + async def async_remove_cache(self, filename: str) -> None: """ - 异步删除本地缓存 + 异步删除缓存,同时删除Redis和本地缓存 """ + # 如果Redis可用,删除Redis缓存 + if self._redis_helper: + try: + self._redis_helper.delete(filename, region="chain_cache") + logger.debug(f"异步删除Redis缓存: {filename}") + except Exception as e: + logger.warning(f"异步删除Redis缓存失败: {e}") + + # 删除本地缓存 cache_path = AsyncPath(settings.TEMP_PATH) / filename if await cache_path.exists(): try: await cache_path.unlink() + logger.debug(f"异步删除本地缓存: {filename}") except Exception as err: - logger.error(f"异步删除缓存 {filename} 出错:{str(err)}") + logger.error(f"异步删除本地缓存 {filename} 出错:{str(err)}") @staticmethod def __is_valid_empty(ret): @@ -923,12 +987,12 @@ class ChainBase(metaclass=ABCMeta): immediately=True if message.userid else False) async def async_post_message(self, - message: Optional[Notification] = None, - meta: Optional[MetaBase] = None, - mediainfo: Optional[MediaInfo] = None, - torrentinfo: Optional[TorrentInfo] = None, - transferinfo: Optional[TransferInfo] = None, - **kwargs) -> None: + message: Optional[Notification] = None, + meta: Optional[MetaBase] = None, + mediainfo: Optional[MediaInfo] = None, + torrentinfo: Optional[TorrentInfo] = None, + transferinfo: Optional[TransferInfo] = None, + **kwargs) -> None: """ 异步发送消息 :param message: Notification实例 @@ -991,15 +1055,16 @@ class ChainBase(metaclass=ABCMeta): break # 按设定发送 await self.eventmanager.async_send_event(etype=EventType.NoticeMessage, - data={**send_message.dict(), "type": send_message.mtype}) + data={**send_message.dict(), "type": send_message.mtype}) await self.messagequeue.async_send_message("post_message", message=send_message) if not send_orignal: return # 发送消息事件 - await self.eventmanager.async_send_event(etype=EventType.NoticeMessage, data={**message.dict(), "type": message.mtype}) + await self.eventmanager.async_send_event(etype=EventType.NoticeMessage, + data={**message.dict(), "type": message.mtype}) # 按原消息发送 await self.messagequeue.async_send_message("post_message", message=message, - immediately=True if message.userid else False) + immediately=True if message.userid else False) def post_medias_message(self, message: Notification, medias: List[MediaInfo]) -> None: """ diff --git a/app/core/cache.py b/app/core/cache.py index 9e957d52..d1bb7758 100644 --- a/app/core/cache.py +++ b/app/core/cache.py @@ -1,17 +1,14 @@ import inspect -import json -import pickle import threading from abc import ABC, abstractmethod from functools import wraps from typing import Any, Dict, Optional -from urllib.parse import quote -import redis from cachetools import TTLCache from cachetools.keys import hashkey from app.core.config import settings +from app.helper.redis import RedisHelper from app.log import logger # 默认缓存区 @@ -250,10 +247,6 @@ class RedisBackend(CacheBackend): - Pickle 反序列化可能存在安全风险,需进一步重构调用来源,避免复杂对象缓存 """ - # 类型缓存集合,针对非容器简单类型 - _complex_serializable_types = set() - _simple_serializable_types = set() - def __init__(self, redis_url: Optional[str] = "redis://localhost", ttl: Optional[int] = 1800): """ 初始化 Redis 缓存实例 @@ -261,101 +254,13 @@ class RedisBackend(CacheBackend): :param redis_url: Redis 服务的 URL :param ttl: 缓存的存活时间,单位秒 """ - self.redis_url = redis_url self.ttl = ttl + self._redis_helper = None try: - self.client = redis.Redis.from_url( - redis_url, - decode_responses=False, - socket_timeout=30, - socket_connect_timeout=5, - health_check_interval=60, - ) - # 测试连接,确保 Redis 可用 - self.client.ping() - logger.debug(f"Successfully connected to Redis") - self.set_memory_limit() - except Exception as e: - logger.error(f"Failed to connect to Redis: {e}") - raise RuntimeError("Redis connection failed") from e - - def set_memory_limit(self, policy: Optional[str] = "allkeys-lru"): - """ - 动态设置 Redis 最大内存和内存淘汰策略 - :param policy: 淘汰策略(如 'allkeys-lru') - """ - try: - # 如果有显式值,则直接使用,为 0 时说明不限制,如果未配置,开启 BIG_MEMORY_MODE 时为 "1024mb",未开启时为 "256mb" - maxmemory = settings.CACHE_REDIS_MAXMEMORY or ("1024mb" if settings.BIG_MEMORY_MODE else "256mb") - self.client.config_set("maxmemory", maxmemory) - self.client.config_set("maxmemory-policy", policy) - logger.debug(f"Redis maxmemory set to {maxmemory}, policy: {policy}") - except Exception as e: - logger.error(f"Failed to set Redis maxmemory or policy: {e}") - - @staticmethod - def is_container_type(t): - return t in (list, dict, tuple, set) - - @classmethod - def serialize(cls, value: Any) -> bytes: - """ - 将值序列化为二进制数据,根据序列化方式标识格式 - """ - vt = type(value) - # 针对非容器类型使用缓存策略 - if not cls.is_container_type(vt): - # 如果已知需要复杂序列化 - if vt in cls._complex_serializable_types: - return b"PICKLE" + b"\x00" + pickle.dumps(value) - # 如果已知可以简单序列化 - if vt in cls._simple_serializable_types: - json_data = json.dumps(value).encode("utf-8") - return b"JSON" + b"\x00" + json_data - # 对于未知的非容器类型,尝试简单序列化,如抛出异常,再使用复杂序列化 - try: - json_data = json.dumps(value).encode("utf-8") - cls._simple_serializable_types.add(vt) - return b"JSON" + b"\x00" + json_data - except TypeError: - cls._complex_serializable_types.add(vt) - return b"PICKLE" + b"\x00" + pickle.dumps(value) - # 针对容器类型,每次尝试简单序列化,不使用缓存 - else: - try: - json_data = json.dumps(value).encode("utf-8") - return b"JSON" + b"\x00" + json_data - except TypeError: - return b"PICKLE" + b"\x00" + pickle.dumps(value) - - @classmethod - def deserialize(cls, value: bytes) -> Any: - """ - 将二进制数据反序列化为原始值,根据格式标识区分序列化方式 - """ - format_marker, data = value.split(b"\x00", 1) - if format_marker == b"JSON": - return json.loads(data.decode("utf-8")) - elif format_marker == b"PICKLE": - return pickle.loads(data) - else: - raise ValueError("Unknown serialization format") - - # @staticmethod - # def serialize(value: Any) -> bytes: - # return msgpack.packb(value, use_bin_type=True) - # - # @staticmethod - # def deserialize(value: bytes) -> Any: - # return msgpack.unpackb(value, raw=False) - - def get_redis_key(self, region: str, key: str) -> str: - """ - 获取缓存 Key - """ - # 使用 region 作为缓存键的一部分 - region = self.get_region(quote(region)) - return f"{region}:key:{quote(key)}" + self.redis_helper = RedisHelper(redis_url=redis_url) + except RuntimeError as e: + logger.warning(f"Redis缓存初始化失败: {e}") + raise e def set(self, key: str, value: Any, ttl: Optional[int] = None, region: Optional[str] = DEFAULT_CACHE_REGION, **kwargs) -> None: @@ -368,15 +273,8 @@ class RedisBackend(CacheBackend): :param region: 缓存的区 :param kwargs: kwargs """ - try: - ttl = ttl or self.ttl - redis_key = self.get_redis_key(region, key) - # 对值进行序列化 - serialized_value = self.serialize(value) - kwargs.pop("maxsize", None) - self.client.set(redis_key, serialized_value, ex=ttl, **kwargs) - except Exception as e: - logger.error(f"Failed to set key: {key} in region: {region}, error: {e}") + ttl = ttl or self.ttl + self.redis_helper.set(key, value, ttl=ttl, region=region, **kwargs) def exists(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> bool: """ @@ -386,12 +284,7 @@ class RedisBackend(CacheBackend): :param region: 缓存的区 :return: 存在返回 True,否则返回 False """ - try: - redis_key = self.get_redis_key(region, key) - return self.client.exists(redis_key) == 1 - except Exception as e: - logger.error(f"Failed to exists key: {key} region: {region}, error: {e}") - return False + return self.redis_helper.exists(key, region=region) def get(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> Optional[Any]: """ @@ -401,15 +294,7 @@ class RedisBackend(CacheBackend): :param region: 缓存的区 :return: 返回缓存的值,如果缓存不存在返回 None """ - try: - redis_key = self.get_redis_key(region, key) - value = self.client.get(redis_key) - if value is not None: - return self.deserialize(value) # noqa - return None - except Exception as e: - logger.error(f"Failed to get key: {key} in region: {region}, error: {e}") - return None + return self.redis_helper.get(key, region=region) def delete(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> None: """ @@ -418,11 +303,7 @@ class RedisBackend(CacheBackend): :param key: 缓存的键 :param region: 缓存的区 """ - try: - redis_key = self.get_redis_key(region, key) - self.client.delete(redis_key) - except Exception as e: - logger.error(f"Failed to delete key: {key} in region: {region}, error: {e}") + self.redis_helper.delete(key, region=region) def clear(self, region: Optional[str] = None) -> None: """ @@ -430,28 +311,13 @@ class RedisBackend(CacheBackend): :param region: 缓存的区 """ - try: - if region: - cache_region = self.get_region(quote(region)) - redis_key = f"{cache_region}:key:*" - # self.client.delete(*self.client.keys(redis_key)) - with self.client.pipeline() as pipe: - for key in self.client.scan_iter(redis_key): - pipe.delete(key) - pipe.execute() - logger.info(f"Cleared Redis cache for region: {region}") - else: - self.client.flushdb() - logger.info("Cleared all Redis cache") - except Exception as e: - logger.error(f"Failed to clear cache, region: {region}, error: {e}") + self.redis_helper.clear(region=region) def close(self) -> None: """ 关闭 Redis 客户端的连接池 """ - if self.client: - self.client.close() + self.redis_helper.close() def get_cache_backend(maxsize: Optional[int] = 512, ttl: Optional[int] = 1800) -> CacheBackend: diff --git a/app/core/config.py b/app/core/config.py index fa1f11a2..68d5192b 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -136,7 +136,7 @@ class ConfigModel(BaseModel): # 缓存类型,支持 cachetools 和 redis,默认使用 cachetools CACHE_BACKEND_TYPE: str = "cachetools" # 缓存连接字符串,仅外部缓存(如 Redis、Memcached)需要 - CACHE_BACKEND_URL: Optional[str] = None + CACHE_BACKEND_URL: Optional[str] = "redis://localhost:6379" # Redis 缓存最大内存限制,未配置时,如开启大内存模式时为 "1024mb",未开启时为 "256mb" CACHE_REDIS_MAXMEMORY: Optional[str] = None diff --git a/app/helper/redis.py b/app/helper/redis.py index ca719093..207f94b3 100644 --- a/app/helper/redis.py +++ b/app/helper/redis.py @@ -6,10 +6,14 @@ from urllib.parse import quote import redis from app.core.config import settings +from app.core.event import eventmanager, Event from app.log import logger +from app.schemas import ConfigChangeEventData +from app.schemas.types import EventType +from app.utils.singleton import SingletonClass -class RedisHelper: +class RedisHelper(metaclass=SingletonClass): """ Redis连接和操作助手类 @@ -54,6 +58,20 @@ class RedisHelper: logger.error(f"Failed to connect to Redis: {e}") raise RuntimeError("Redis connection failed") from e + @eventmanager.register(EventType.ConfigChanged) + def handle_config_changed(self, event: Event): + """ + 处理配置变更事件,更新Redis设置 + :param event: 事件对象 + """ + if not event: + return + event_data: ConfigChangeEventData = event.event_data + if event_data.key not in ['CACHE_BACKEND_TYPE', 'CACHE_BACKEND_URL', 'CACHE_REDIS_MAXMEMORY']: + return + logger.info("配置变更,重连Redis...") + self._connect() + def set_memory_limit(self, policy: Optional[str] = "allkeys-lru"): """ 动态设置Redis最大内存和内存淘汰策略