feat(config): 优化配置变更事件处理机制

This commit is contained in:
Attente
2025-11-27 23:17:34 +08:00
parent d1d7b8ce55
commit ca5ec8af0f
23 changed files with 205 additions and 335 deletions

View File

@@ -248,11 +248,9 @@ async def set_env_setting(env: dict,
)
if success_updates:
for key in success_updates.keys():
# 发送配置变更事件
await eventmanager.async_send_event(etype=EventType.ConfigChanged, data=ConfigChangeEventData(
key=key,
value=getattr(settings, key, None),
key=success_updates.keys(),
change_type="update"
))

View File

@@ -6,11 +6,11 @@ import importlib.util
import inspect
import os
import sys
import threading
import time
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import threading
from typing import Any, Dict, List, Optional, Type, Union, Callable, Tuple
from fastapi import HTTPException
@@ -20,7 +20,7 @@ from watchfiles import watch
from app import schemas
from app.core.cache import fresh, async_fresh
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.core.event import eventmanager
from app.db.plugindata_oper import PluginDataOper
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.plugin import PluginHelper
@@ -28,16 +28,16 @@ from app.helper.sites import SitesHelper # noqa
from app.log import logger
from app.schemas.types import EventType, SystemConfigKey
from app.utils.crypto import RSAUtils
from app.utils.mixins import ConfigReloadMixin
from app.utils.object import ObjectUtils
from app.utils.singleton import Singleton
from app.utils.string import StringUtils
from app.utils.system import SystemUtils
class PluginManager(metaclass=Singleton):
"""
插件管理器
"""
class PluginManager(ConfigReloadMixin, metaclass=Singleton):
"""插件管理器"""
CONFIG_WATCH = {"DEV", "PLUGIN_AUTO_RELOAD"}
def __init__(self):
# 插件列表
@@ -250,20 +250,12 @@ class PluginManager(metaclass=Singleton):
"""
return self._plugins
@eventmanager.register(EventType.ConfigChanged)
def handle_config_changed(self, event: Event):
"""
处理配置变更事件
:param event: 事件对象
"""
if not event:
return
event_data: schemas.ConfigChangeEventData = event.event_data
if event_data.key not in ['DEV', 'PLUGIN_AUTO_RELOAD']:
return
logger.info("配置变更,重新加载插件文件修改监测...")
def on_config_changed(self):
self.reload_monitor()
def get_reload_name(self) -> str:
return "插件文件修改监测"
def reload_monitor(self):
"""
重新加载插件文件修改监测

View File

@@ -14,10 +14,8 @@ from threading import Lock
from typing import Dict, Optional
from app.core.config import settings
from app.core.event import Event, eventmanager
from app.log import logger
from app.schemas import ConfigChangeEventData
from app.schemas.types import EventType
from app.utils.mixins import ConfigReloadMixin
from app.utils.singleton import Singleton
# 定义一个全局线程池执行器
@@ -69,25 +67,23 @@ def enable_doh(enable: bool):
socket.getaddrinfo = _orig_getaddrinfo
class DohHelper(metaclass=Singleton):
class DohHelper(ConfigReloadMixin, metaclass=Singleton):
"""
DoH帮助类用于处理DNS over HTTPS解析。
"""
CONFIG_WATCH = {"DOH_ENABLE", "DOH_DOMAINS", "DOH_RESOLVERS"}
def __init__(self):
enable_doh(settings.DOH_ENABLE)
@eventmanager.register(EventType.ConfigChanged)
def handle_config_changed(self, event: Event):
if not event:
return
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in ["DOH_ENABLE", "DOH_DOMAINS", "DOH_RESOLVERS"]:
return
def on_config_changed(self):
with _doh_lock:
# DOH配置有变动的情况下清空缓存
_doh_cache.clear()
enable_doh(settings.DOH_ENABLE)
def get_reload_name(self):
return 'DoH'
def _doh_query(resolver: str, host: str) -> Optional[str]:
"""

View File

@@ -7,10 +7,8 @@ import redis
from redis.asyncio import Redis
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.log import logger
from app.schemas import ConfigChangeEventData
from app.schemas.types import EventType
from app.utils.mixins import ConfigReloadMixin
from app.utils.singleton import Singleton
# 类型缓存集合,针对非容器简单类型
@@ -74,7 +72,7 @@ def deserialize(value: bytes) -> Any:
raise ValueError("Unknown serialization format")
class RedisHelper(metaclass=Singleton):
class RedisHelper(ConfigReloadMixin, metaclass=Singleton):
"""
Redis连接和操作助手类单例模式
@@ -84,6 +82,7 @@ class RedisHelper(metaclass=Singleton):
- 支持内存限制和淘汰策略设置
- 提供键名生成和区域管理功能
"""
CONFIG_WATCH = {"CACHE_BACKEND_TYPE", "CACHE_BACKEND_URL", "CACHE_REDIS_MAXMEMORY"}
def __init__(self):
"""
@@ -114,21 +113,13 @@ class RedisHelper(metaclass=Singleton):
self.client = None
raise RuntimeError("Redis connection failed") from e
@eventmanager.register(EventType.ConfigChanged)
def handle_config_changed(self, event: Event):
"""
处理配置变更事件更新Redis设置
:param event: 事件对象
"""
if not event:
return
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in ['CACHE_BACKEND_TYPE', 'CACHE_BACKEND_URL', 'CACHE_REDIS_MAXMEMORY']:
return
logger.info("配置变更重连Redis...")
def on_config_changed(self):
self.close()
self._connect()
def get_reload_name(self):
return "Redis"
def set_memory_limit(self, policy: Optional[str] = "allkeys-lru"):
"""
动态设置Redis最大内存和内存淘汰策略
@@ -310,7 +301,7 @@ class RedisHelper(metaclass=Singleton):
logger.debug("Redis connection closed")
class AsyncRedisHelper(metaclass=Singleton):
class AsyncRedisHelper(ConfigReloadMixin, metaclass=Singleton):
"""
异步Redis连接和操作助手类单例模式
@@ -321,6 +312,7 @@ class AsyncRedisHelper(metaclass=Singleton):
- 提供键名生成和区域管理功能
- 所有操作都是异步的
"""
CONFIG_WATCH = {"CACHE_BACKEND_TYPE", "CACHE_BACKEND_URL", "CACHE_REDIS_MAXMEMORY"}
def __init__(self):
"""
@@ -351,21 +343,13 @@ class AsyncRedisHelper(metaclass=Singleton):
self.client = None
raise RuntimeError("Redis async connection failed") from e
@eventmanager.register(EventType.ConfigChanged)
async def handle_config_changed(self, event: Event):
"""
处理配置变更事件更新Redis设置
:param event: 事件对象
"""
if not event:
return
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in ['CACHE_BACKEND_TYPE', 'CACHE_BACKEND_URL', 'CACHE_REDIS_MAXMEMORY']:
return
logger.info("配置变更重连Redis (async)...")
async def on_config_changed(self):
await self.close()
await self._connect()
def get_reload_name(self):
return "Redis (async)"
async def set_memory_limit(self, policy: Optional[str] = "allkeys-lru"):
"""
动态设置Redis最大内存和内存淘汰策略

View File

@@ -8,35 +8,32 @@ from typing import Tuple
import docker
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.log import logger
from app.schemas import ConfigChangeEventData
from app.schemas.types import EventType
from app.utils.mixins import ConfigReloadMixin
from app.utils.system import SystemUtils
class SystemHelper:
class SystemHelper(ConfigReloadMixin):
"""
系统工具类,提供系统相关的操作和判断
"""
CONFIG_WATCH = {
"DEBUG",
"LOG_LEVEL",
"LOG_MAX_FILE_SIZE",
"LOG_BACKUP_COUNT",
"LOG_FILE_FORMAT",
"LOG_CONSOLE_FORMAT",
}
__system_flag_file = "/var/log/nginx/__moviepilot__"
@eventmanager.register(EventType.ConfigChanged)
def handle_config_changed(self, event: Event):
"""
处理配置变更事件,更新日志设置
:param event: 事件对象
"""
if not event:
return
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in ['DEBUG', 'LOG_LEVEL', 'LOG_MAX_FILE_SIZE', 'LOG_BACKUP_COUNT',
'LOG_FILE_FORMAT', 'LOG_CONSOLE_FORMAT']:
return
logger.info("配置变更,更新日志设置...")
def on_config_changed(self):
logger.update_loggers()
def get_reload_name(self):
return "日志设置"
@staticmethod
def can_restart() -> bool:
"""

View File

@@ -4,15 +4,22 @@ from typing import Generic, Tuple, Union, TypeVar, Type, Dict, Optional, Callabl
from app.helper.service import ServiceConfigHelper
from app.schemas import Notification, NotificationConf, MediaServerConf, DownloaderConf
from app.schemas.types import ModuleType, DownloaderType, MediaServerType, MessageChannel, StorageSchema, \
OtherModulesType
OtherModulesType, SystemConfigKey
from app.utils.mixins import ConfigReloadMixin
class _ModuleBase(metaclass=ABCMeta):
class _ModuleBase(ConfigReloadMixin, metaclass=ABCMeta):
"""
模块基类实现对应方法在有需要时会被自动调用返回None代表不启用该模块将继续执行下一模块
输入参数与输出参数一致的,或没有输出的,可以被多个模块重复实现
"""
def on_config_changed(self):
self.init_module()
def get_reload_name(self):
return self.get_name()
@abstractmethod
def init_module(self) -> None:
"""
@@ -177,6 +184,7 @@ class _MessageBase(ServiceBase[TService, NotificationConf]):
"""
消息基类
"""
CONFIG_WATCH = {SystemConfigKey.Notifications.value}
def __init__(self):
"""
@@ -224,6 +232,7 @@ class _DownloaderBase(ServiceBase[TService, DownloaderConf]):
"""
下载器基类
"""
CONFIG_WATCH = {SystemConfigKey.Downloaders.value}
def __init__(self):
"""
@@ -287,6 +296,7 @@ class _MediaServerBase(ServiceBase[TService, MediaServerConf]):
"""
媒体服务器基类
"""
CONFIG_WATCH = {SystemConfigKey.MediaServers.value}
def get_configs(self) -> Dict[str, MediaServerConf]:
"""

View File

@@ -2,11 +2,11 @@ from typing import Any, Generator, List, Optional, Tuple, Union
from app import schemas
from app.core.context import MediaInfo
from app.core.event import eventmanager, Event
from app.core.event import eventmanager
from app.log import logger
from app.modules import _MediaServerBase, _ModuleBase
from app.modules.emby.emby import Emby
from app.schemas.types import MediaType, ModuleType, ChainEventType, MediaServerType, SystemConfigKey, EventType
from app.schemas.types import MediaType, ModuleType, ChainEventType, MediaServerType
class EmbyModule(_ModuleBase, _MediaServerBase[Emby]):
@@ -18,20 +18,6 @@ class EmbyModule(_ModuleBase, _MediaServerBase[Emby]):
super().init_service(service_name=Emby.__name__.lower(),
service_type=lambda conf: Emby(**conf.config, sync_libraries=conf.sync_libraries))
@eventmanager.register(EventType.ConfigChanged)
def handle_config_changed(self, event: Event):
"""
处理配置变更事件
:param event: 事件对象
"""
if not event:
return
event_data: schemas.ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.MediaServers.value]:
return
logger.info("配置变更重新初始化Emby模块...")
self.init_module()
@staticmethod
def get_name() -> str:
return "Emby"

View File

@@ -2,12 +2,12 @@ from typing import Any, Generator, List, Optional, Tuple, Union
from app import schemas
from app.core.context import MediaInfo
from app.core.event import eventmanager, Event
from app.core.event import eventmanager
from app.log import logger
from app.modules import _MediaServerBase, _ModuleBase
from app.modules.jellyfin.jellyfin import Jellyfin
from app.schemas import AuthCredentials, AuthInterceptCredentials
from app.schemas.types import MediaType, ModuleType, ChainEventType, MediaServerType, SystemConfigKey, EventType
from app.schemas.types import MediaType, ModuleType, ChainEventType, MediaServerType
class JellyfinModule(_ModuleBase, _MediaServerBase[Jellyfin]):
@@ -19,20 +19,6 @@ class JellyfinModule(_ModuleBase, _MediaServerBase[Jellyfin]):
super().init_service(service_name=Jellyfin.__name__.lower(),
service_type=lambda conf: Jellyfin(**conf.config, sync_libraries=conf.sync_libraries))
@eventmanager.register(EventType.ConfigChanged)
def handle_config_changed(self, event: Event):
"""
处理配置变更事件
:param event: 事件对象
"""
if not event:
return
event_data: schemas.ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.MediaServers.value]:
return
logger.info("配置变更重新初始化Jellyfin模块...")
self.init_module()
@staticmethod
def get_name() -> str:
return "Jellyfin"

View File

@@ -2,12 +2,12 @@ from typing import Optional, Tuple, Union, Any, List, Generator
from app import schemas
from app.core.context import MediaInfo
from app.core.event import eventmanager, Event
from app.core.event import eventmanager
from app.log import logger
from app.modules import _ModuleBase, _MediaServerBase
from app.modules.plex.plex import Plex
from app.schemas import AuthCredentials, AuthInterceptCredentials
from app.schemas.types import MediaType, ModuleType, ChainEventType, MediaServerType, SystemConfigKey, EventType
from app.schemas.types import MediaType, ModuleType, ChainEventType, MediaServerType
class PlexModule(_ModuleBase, _MediaServerBase[Plex]):
@@ -19,20 +19,6 @@ class PlexModule(_ModuleBase, _MediaServerBase[Plex]):
super().init_service(service_name=Plex.__name__.lower(),
service_type=lambda conf: Plex(**conf.config, sync_libraries=conf.sync_libraries))
@eventmanager.register(EventType.ConfigChanged)
def handle_config_changed(self, event: Event):
"""
处理配置变更事件
:param event: 事件对象
"""
if not event:
return
event_data: schemas.ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.MediaServers.value]:
return
logger.info("配置变更重新初始化Plex模块...")
self.init_module()
@staticmethod
def get_name() -> str:
return "Plex"

View File

@@ -7,13 +7,12 @@ from torrentool.torrent import Torrent
from app import schemas
from app.core.cache import FileCache
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.core.metainfo import MetaInfo
from app.log import logger
from app.modules import _ModuleBase, _DownloaderBase
from app.modules.qbittorrent.qbittorrent import Qbittorrent
from app.schemas import TransferTorrent, DownloadingTorrent
from app.schemas.types import TorrentStatus, ModuleType, DownloaderType, SystemConfigKey, EventType
from app.schemas.types import TorrentStatus, ModuleType, DownloaderType
from app.utils.string import StringUtils
@@ -26,20 +25,6 @@ class QbittorrentModule(_ModuleBase, _DownloaderBase[Qbittorrent]):
super().init_service(service_name=Qbittorrent.__name__.lower(),
service_type=Qbittorrent)
@eventmanager.register(EventType.ConfigChanged)
def handle_config_changed(self, event: Event):
"""
处理配置变更事件
:param event: 事件对象
"""
if not event:
return
event_data: schemas.ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.Downloaders.value]:
return
logger.info("配置变更重新加载Qbittorrent模块...")
self.init_module()
@staticmethod
def get_name() -> str:
return "Qbittorrent"

View File

@@ -3,12 +3,11 @@ import re
from typing import Optional, Union, List, Tuple, Any
from app.core.context import MediaInfo, Context
from app.core.event import eventmanager, Event
from app.log import logger
from app.modules import _ModuleBase, _MessageBase
from app.modules.slack.slack import Slack
from app.schemas import MessageChannel, CommingMessage, Notification, ConfigChangeEventData
from app.schemas.types import ModuleType, SystemConfigKey, EventType
from app.schemas import MessageChannel, CommingMessage, Notification
from app.schemas.types import ModuleType
class SlackModule(_ModuleBase, _MessageBase[Slack]):
@@ -21,20 +20,6 @@ class SlackModule(_ModuleBase, _MessageBase[Slack]):
service_type=Slack)
self._channel = MessageChannel.Slack
@eventmanager.register(EventType.ConfigChanged)
def handle_config_changed(self, event: Event):
"""
处理配置变更事件
:param event: 事件对象
"""
if not event:
return
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.Notifications.value]:
return
logger.info("配置变更重新加载Slack模块...")
self.init_module()
@staticmethod
def get_name() -> str:
return "Slack"

View File

@@ -1,12 +1,11 @@
from typing import Optional, Union, List, Tuple, Any
from app.core.context import MediaInfo, Context
from app.core.event import eventmanager, Event
from app.log import logger
from app.modules import _ModuleBase, _MessageBase
from app.modules.synologychat.synologychat import SynologyChat
from app.schemas import MessageChannel, CommingMessage, Notification, ConfigChangeEventData
from app.schemas.types import ModuleType, SystemConfigKey, EventType
from app.schemas import MessageChannel, CommingMessage, Notification
from app.schemas.types import ModuleType
class SynologyChatModule(_ModuleBase, _MessageBase[SynologyChat]):
@@ -19,20 +18,6 @@ class SynologyChatModule(_ModuleBase, _MessageBase[SynologyChat]):
service_type=SynologyChat)
self._channel = MessageChannel.SynologyChat
@eventmanager.register(EventType.ConfigChanged)
def handle_config_changed(self, event: Event):
"""
处理配置变更事件
:param event: 事件对象
"""
if not event:
return
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.Notifications.value]:
return
logger.info("配置变更重新加载SynologyChat模块...")
self.init_module()
@staticmethod
def get_name() -> str:
return "Synology Chat"

View File

@@ -4,13 +4,13 @@ import re
from typing import Dict, Optional, Union, List, Tuple, Any
from app.core.context import MediaInfo, Context
from app.core.event import eventmanager, Event
from app.core.event import eventmanager
from app.log import logger
from app.modules import _ModuleBase, _MessageBase
from app.modules.telegram.telegram import Telegram
from app.schemas import MessageChannel, CommingMessage, Notification, CommandRegisterEventData, ConfigChangeEventData, \
from app.schemas import MessageChannel, CommingMessage, Notification, CommandRegisterEventData, \
NotificationConf
from app.schemas.types import ModuleType, ChainEventType, SystemConfigKey, EventType
from app.schemas.types import ModuleType, ChainEventType
from app.utils.structures import DictUtils
@@ -24,20 +24,6 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
service_type=Telegram)
self._channel = MessageChannel.Telegram
@eventmanager.register(EventType.ConfigChanged)
def handle_config_changed(self, event: Event):
"""
处理配置变更事件
:param event: 事件对象
"""
if not event:
return
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.Notifications.value]:
return
logger.info("配置变更重新加载Telegram模块...")
self.init_module()
@staticmethod
def get_name() -> str:
return "Telegram"

View File

@@ -7,7 +7,6 @@ import zhconv
from app import schemas
from app.core.config import settings
from app.core.context import MediaInfo
from app.core.event import eventmanager, Event
from app.core.meta import MetaBase
from app.log import logger
from app.modules import _ModuleBase
@@ -15,7 +14,7 @@ from app.modules.themoviedb.category import CategoryHelper
from app.modules.themoviedb.scraper import TmdbScraper
from app.modules.themoviedb.tmdb_cache import TmdbCache
from app.modules.themoviedb.tmdbapi import TmdbApi
from app.schemas.types import MediaType, MediaImageType, ModuleType, MediaRecognizeType, EventType
from app.schemas.types import MediaType, MediaImageType, ModuleType, MediaRecognizeType
from app.utils.http import RequestUtils
@@ -23,6 +22,7 @@ class TheMovieDbModule(_ModuleBase):
"""
TMDB媒体信息匹配
"""
CONFIG_WATCH = {"PROXY_HOST", "TMDB_API_DOMAIN", "TMDB_API_KEY", "TMDB_LOCALE"}
# 元数据缓存
cache: TmdbCache = None
@@ -39,18 +39,7 @@ class TheMovieDbModule(_ModuleBase):
self.category = CategoryHelper()
self.scraper = TmdbScraper()
@eventmanager.register(EventType.ConfigChanged)
def handle_config_changed(self, event: Event):
"""
处理配置变更事件
:param event: 事件对象
"""
if not event:
return
event_data: schemas.ConfigChangeEventData = event.event_data
if event_data.key not in ["PROXY_HOST", "TMDB_API_DOMAIN", "TMDB_API_KEY", "TMDB_LOCALE"]:
return
logger.info("配置变更重新初始化TheMovieDb模块...")
def on_config_changed(self):
# 停止模块
self.stop()
# 初始化模块

View File

@@ -7,13 +7,12 @@ from transmission_rpc import File
from app import schemas
from app.core.cache import FileCache
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.core.metainfo import MetaInfo
from app.log import logger
from app.modules import _ModuleBase, _DownloaderBase
from app.modules.transmission.transmission import Transmission
from app.schemas import TransferTorrent, DownloadingTorrent
from app.schemas.types import TorrentStatus, ModuleType, DownloaderType, SystemConfigKey, EventType
from app.schemas.types import TorrentStatus, ModuleType, DownloaderType
from app.utils.string import StringUtils
@@ -26,20 +25,6 @@ class TransmissionModule(_ModuleBase, _DownloaderBase[Transmission]):
super().init_service(service_name=Transmission.__name__.lower(),
service_type=Transmission)
@eventmanager.register(EventType.ConfigChanged)
def handle_config_changed(self, event: Event):
"""
处理配置变更事件
:param event: 事件对象
"""
if not event:
return
event_data: schemas.ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.Downloaders.value]:
return
logger.info("配置变更重新加载Transmission模块...")
self.init_module()
@staticmethod
def get_name() -> str:
return "Transmission"

View File

@@ -2,12 +2,12 @@ from typing import Any, Generator, List, Optional, Tuple, Union
from app import schemas
from app.core.context import MediaInfo
from app.core.event import eventmanager, Event
from app.core.event import eventmanager
from app.log import logger
from app.modules import _MediaServerBase, _ModuleBase
from app.modules.trimemedia.trimemedia import TrimeMedia
from app.schemas import AuthCredentials, AuthInterceptCredentials
from app.schemas.types import ChainEventType, MediaServerType, MediaType, ModuleType, SystemConfigKey, EventType
from app.schemas.types import ChainEventType, MediaServerType, MediaType, ModuleType
class TrimeMediaModule(_ModuleBase, _MediaServerBase[TrimeMedia]):
@@ -23,20 +23,6 @@ class TrimeMediaModule(_ModuleBase, _MediaServerBase[TrimeMedia]):
),
)
@eventmanager.register(EventType.ConfigChanged)
def handle_config_changed(self, event: Event):
"""
处理配置变更事件
:param event: 事件对象
"""
if not event:
return
event_data: schemas.ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.MediaServers.value]:
return
logger.info("配置变更,重新加载飞牛影视模块...")
self.init_module()
@staticmethod
def get_name() -> str:
return "飞牛影视"

View File

@@ -2,12 +2,11 @@ import json
from typing import Optional, Union, List, Tuple, Any, Dict
from app.core.context import Context, MediaInfo
from app.core.event import eventmanager, Event
from app.log import logger
from app.modules import _ModuleBase, _MessageBase
from app.modules.vocechat.vocechat import VoceChat
from app.schemas import MessageChannel, CommingMessage, Notification, ConfigChangeEventData
from app.schemas.types import ModuleType, SystemConfigKey, EventType
from app.schemas import MessageChannel, CommingMessage, Notification
from app.schemas.types import ModuleType
class VoceChatModule(_ModuleBase, _MessageBase[VoceChat]):
@@ -20,20 +19,6 @@ class VoceChatModule(_ModuleBase, _MessageBase[VoceChat]):
service_type=VoceChat)
self._channel = MessageChannel.VoceChat
@eventmanager.register(EventType.ConfigChanged)
def handle_config_changed(self, event: Event):
"""
处理配置变更事件
:param event: 事件对象
"""
if not event:
return
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.Notifications.value]:
return
logger.info("配置变更重新加载VoceChat模块...")
self.init_module()
@staticmethod
def get_name() -> str:
return "VoceChat"

View File

@@ -4,11 +4,10 @@ from typing import Union, Tuple
from pywebpush import webpush, WebPushException
from app.core.config import global_vars, settings
from app.core.event import eventmanager, Event
from app.log import logger
from app.modules import _ModuleBase, _MessageBase
from app.schemas import Notification, ConfigChangeEventData
from app.schemas.types import ModuleType, MessageChannel, SystemConfigKey, EventType
from app.schemas import Notification
from app.schemas.types import ModuleType, MessageChannel
class WebPushModule(_ModuleBase, _MessageBase):
@@ -20,20 +19,6 @@ class WebPushModule(_ModuleBase, _MessageBase):
super().init_service(service_name=self.get_name().lower())
self._channel = MessageChannel.WebPush
@eventmanager.register(EventType.ConfigChanged)
def handle_config_changed(self, event: Event):
"""
处理配置变更事件
:param event: 事件对象
"""
if not event:
return
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.Notifications.value]:
return
logger.info("配置变更重新加载WebPush模块...")
self.init_module()
@staticmethod
def get_name() -> str:
return "WebPush"

View File

@@ -3,13 +3,13 @@ import xml.dom.minidom
from typing import Optional, Union, List, Tuple, Any, Dict
from app.core.context import Context, MediaInfo
from app.core.event import Event, eventmanager
from app.core.event import eventmanager
from app.log import logger
from app.modules import _ModuleBase, _MessageBase
from app.modules.wechat.WXBizMsgCrypt3 import WXBizMsgCrypt
from app.modules.wechat.wechat import WeChat
from app.schemas import MessageChannel, CommingMessage, Notification, CommandRegisterEventData, ConfigChangeEventData
from app.schemas.types import ModuleType, ChainEventType, SystemConfigKey, EventType
from app.schemas.types import ModuleType, ChainEventType
from app.utils.dom import DomUtils
from app.utils.structures import DictUtils
@@ -24,20 +24,6 @@ class WechatModule(_ModuleBase, _MessageBase[WeChat]):
service_type=WeChat)
self._channel = MessageChannel.Wechat
@eventmanager.register(EventType.ConfigChanged)
def handle_config_changed(self, event: Event):
"""
处理配置变更事件
:param event: 事件对象
"""
if not event:
return
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.Notifications.value]:
return
logger.info("配置变更重新加载Wechat模块...")
self.init_module()
@staticmethod
def get_name() -> str:
return "微信"

View File

@@ -17,13 +17,12 @@ from app.chain.storage import StorageChain
from app.chain.transfer import TransferChain
from app.core.cache import TTLCache, FileCache
from app.core.config import settings
from app.core.event import Event, eventmanager
from app.helper.directory import DirectoryHelper
from app.helper.message import MessageHelper
from app.log import logger
from app.schemas import ConfigChangeEventData
from app.schemas import FileItem
from app.schemas.types import SystemConfigKey, EventType
from app.schemas.types import SystemConfigKey
from app.utils.mixins import ConfigReloadMixin
from app.utils.singleton import SingletonClass
from app.utils.system import SystemUtils
@@ -60,10 +59,11 @@ class FileMonitorHandler(FileSystemEventHandler):
logger.error(f"on_moved 异常: {e}")
class Monitor(metaclass=SingletonClass):
class Monitor(ConfigReloadMixin, metaclass=SingletonClass):
"""
目录监控处理链,单例模式
"""
CONFIG_WATCH = {SystemConfigKey.Directories.value}
def __init__(self):
super().__init__()
@@ -84,20 +84,12 @@ class Monitor(metaclass=SingletonClass):
# 启动目录监控和文件整理
self.init()
@eventmanager.register(EventType.ConfigChanged)
def handle_config_changed(self, event: Event):
"""
处理配置变更事件
:param event: 事件对象
"""
if not event:
return
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.Directories.value]:
return
logger.info("配置变更事件触发,重新初始化目录监控...")
def on_config_changed(self):
self.init()
def get_reload_name(self):
return "目录监控"
def save_snapshot(self, storage: str, snapshot: Dict, file_count: int = 0,
last_snapshot_time: Optional[float] = None):
"""

View File

@@ -22,16 +22,17 @@ from app.chain.subscribe import SubscribeChain
from app.chain.transfer import TransferChain
from app.chain.workflow import WorkflowChain
from app.core.config import settings, global_vars
from app.core.event import eventmanager, Event
from app.core.event import eventmanager
from app.core.plugin import PluginManager
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.message import MessageHelper
from app.helper.sites import SitesHelper # noqa
from app.helper.wallpaper import WallpaperHelper
from app.log import logger
from app.schemas import Notification, NotificationType, Workflow, ConfigChangeEventData
from app.schemas import Notification, NotificationType, Workflow
from app.schemas.types import EventType, SystemConfigKey
from app.utils.gc import get_memory_usage
from app.utils.mixins import ConfigReloadMixin
from app.utils.singleton import SingletonClass
from app.utils.timer import TimerUtils
@@ -42,10 +43,20 @@ class SchedulerChain(ChainBase):
pass
class Scheduler(metaclass=SingletonClass):
class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
"""
定时任务管理
"""
CONFIG_WATCH = {
"DEV",
"COOKIECLOUD_INTERVAL",
"MEDIASERVER_SYNC_INTERVAL",
"SUBSCRIBE_SEARCH",
"SUBSCRIBE_SEARCH_INTERVAL",
"SUBSCRIBE_MODE",
"SUBSCRIBE_RSS_INTERVAL",
"SITEDATA_REFRESH_INTERVAL",
}
def __init__(self):
# 定时服务
@@ -63,22 +74,12 @@ class Scheduler(metaclass=SingletonClass):
# 初始化
self.init()
@eventmanager.register(EventType.ConfigChanged)
def handle_config_changed(self, event: Event):
"""
处理配置变更事件
:param event: 事件对象
"""
if not event:
return
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in ['DEV', 'COOKIECLOUD_INTERVAL', 'MEDIASERVER_SYNC_INTERVAL', 'SUBSCRIBE_SEARCH',
'SUBSCRIBE_SEARCH_INTERVAL', 'SUBSCRIBE_MODE', 'SUBSCRIBE_RSS_INTERVAL',
'SITEDATA_REFRESH_INTERVAL']:
return
logger.info(f"配置项 {event_data.key} 变更,重新初始化定时服务...")
def on_config_changed(self):
self.init()
def get_reload_name(self):
return "定时服务"
def init(self):
"""
初始化定时服务

View File

@@ -1,7 +1,7 @@
from pathlib import Path
from typing import Optional, Dict, Any, List, Set, Callable
from typing import Iterable, Optional, Dict, Any, List, Set, Callable
from pydantic import BaseModel, Field, model_validator
from pydantic import BaseModel, Field, field_validator, model_validator
from app.schemas.message import MessageChannel
from app.schemas.file import FileItem
@@ -27,10 +27,29 @@ class ConfigChangeEventData(BaseEventData):
"""
ConfigChange 事件的数据模型
"""
key: str = Field(..., description="配置项的键")
key: set[str] = Field(..., description="配置项的键(集合类型)")
value: Optional[Any] = Field(default=None, description="配置项的新值")
change_type: str = Field(default="update", description="配置项的变更类型,如 'add', 'update', 'delete'")
@field_validator('key', mode='before')
@classmethod
def convert_to_set(cls, v):
"""将输入的 str、list、dict.keys() 等转为 set"""
if v is None:
return set()
elif isinstance(v, str):
return {v}
elif isinstance(v, dict):
return set(str(k) for k in v.keys())
elif isinstance(v, (list, tuple)):
return set(str(item) for item in v)
elif isinstance(v, set):
return set(str(item) for item in v)
elif isinstance(v, Iterable):
return set(str(item) for item in v)
else:
return {str(v)}
class ChainEventData(BaseEventData):
"""

66
app/utils/mixins.py Normal file
View File

@@ -0,0 +1,66 @@
import inspect
from app.core.event import eventmanager, Event
from app.log import logger
from app.schemas.types import EventType
class ConfigReloadMixin:
"""配置重载混入类
继承此 Mixin 类的类,会在配置变更时自动调用 on_config_changed 方法。
在类中定义 CONFIG_WATCH 集合,指定需要监听的配置项
重写 on_config_changed 方法实现具体的重载逻辑
可选地重写 get_reload_name 方法提供模块名称(用于日志显示)
"""
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
config_watch = getattr(cls, 'CONFIG_WATCH', None)
if not config_watch:
return
# 检查 on_config_changed 方法是否为异步
is_async = inspect.iscoroutinefunction(cls.on_config_changed)
method_name = 'handle_config_changed'
# 创建事件处理函数
def create_handler(is_async):
if is_async:
async def wrapper(self: ConfigReloadMixin, event: Event):
if not event:
return
changed_keys = getattr(event.event_data, "key", set()) & config_watch
if not changed_keys:
return
logger.info(f"配置 {', '.join(changed_keys)} 变更,重载 {self.get_reload_name()}...")
await self.on_config_changed()
else:
def wrapper(self: ConfigReloadMixin, event: Event):
if not event:
return
changed_keys = getattr(event.event_data, "key", set()) & config_watch
if not changed_keys:
return
logger.info(f"配置 {', '.join(changed_keys)} 变更,重载 {self.get_reload_name()}...")
self.on_config_changed()
return wrapper
# 创建并设置处理函数
handler = create_handler(is_async)
handler.__module__ = cls.__module__
handler.__qualname__ = f'{cls.__name__}.{method_name}'
setattr(cls, method_name, handler)
# 添加为事件处理器
eventmanager.add_event_listener(EventType.ConfigChanged, handler)
def on_config_changed(self):
"""子类重写此方法实现具体重载逻辑"""
pass
def get_reload_name(self):
"""功能/模块名称"""
return self.__class__.__name__