Merge pull request #2812 from InfinityPacer/feature/module

This commit is contained in:
jxxghp
2024-10-06 14:49:52 +08:00
committed by GitHub
12 changed files with 122 additions and 122 deletions

View File

@@ -1,5 +1,5 @@
from abc import abstractmethod, ABCMeta
from typing import Generic, Tuple, Union, TypeVar, Type, Dict, Optional, Callable, Any, List
from typing import Generic, Tuple, Union, TypeVar, Type, Dict, Optional, Callable, Any
from app.helper.serviceconfig import ServiceConfigHelper
from app.schemas import Notification, MessageChannel, NotificationConf, MediaServerConf, DownloaderConf
@@ -64,8 +64,9 @@ class ServiceBase(Generic[TService, TConf], metaclass=ABCMeta):
"""
初始化 ServiceBase 类的实例
"""
self._configs: Dict[str, TConf] = {}
self._instances: Dict[str, TService] = {}
self._configs: Optional[Dict[str, TConf]] = None
self._instances: Optional[Dict[str, TService]] = None
self._service_name: Optional[str] = None
def init_service(self, service_name: str,
service_type: Optional[Union[Type[TService], Callable[..., TService]]] = None):
@@ -75,20 +76,24 @@ class ServiceBase(Generic[TService, TConf], metaclass=ABCMeta):
:param service_name: 服务名称,作为配置匹配的依据
:param service_type: 服务的类型可以是类类型Type[TService]、工厂函数Callable或 None 来跳过实例化
"""
if not service_name:
raise Exception("service_name is null")
self._service_name = service_name
configs = self.get_configs()
if not configs:
if configs is None:
return
for conf in configs:
if conf.enabled and conf.type == service_name:
self._configs[conf.name] = conf
if service_type:
# 通过服务类型或工厂函数来创建实例
if isinstance(service_type, type):
# 如果传入的是类类型,调用构造函数实例化
self._instances[conf.name] = service_type(**conf.config)
else:
# 如果传入的是工厂函数,直接调用工厂函数
self._instances[conf.name] = service_type(conf)
self._configs = configs
self._instances = {}
if not service_type:
return
for conf in self._configs.values():
# 通过服务类型或工厂函数来创建实例
if isinstance(service_type, type):
# 如果传入的是类类型,调用构造函数实例化
self._instances[conf.name] = service_type(**conf.config)
else:
# 如果传入的是工厂函数,直接调用工厂函数
self._instances[conf.name] = service_type(conf)
def get_instances(self) -> Dict[str, TService]:
"""
@@ -96,7 +101,7 @@ class ServiceBase(Generic[TService, TConf], metaclass=ABCMeta):
:return: 返回服务实例列表
"""
return self._instances
return self._instances or {}
def get_instance(self, name: str) -> Optional[TService]:
"""
@@ -105,33 +110,29 @@ class ServiceBase(Generic[TService, TConf], metaclass=ABCMeta):
:param name: 实例名称
:return: 返回对应名称的服务实例,若不存在则返回 None
"""
if not name:
if not name or not self._instances:
return None
return self._instances.get(name)
@abstractmethod
def get_configs(self) -> List[TConf]:
def get_configs(self) -> Dict[str, TConf]:
"""
获取服务配置列表
获取已启用的服务配置字典
:return: 返回配置列表
:return: 返回配置字典
"""
pass
def get_config(self, name: str, ctype: str = None) -> Optional[TConf]:
def get_config(self, name: str) -> Optional[TConf]:
"""
获取配置,支持类型过滤
:param name: 配置名称
:param ctype: 配置类型,可选,默认不进行类型过滤
:return: 返回符合条件的配置,若不存在则返回 None
"""
if not name:
if not name or not self._configs:
return None
conf = self._configs.get(name)
if not ctype:
return conf
return conf if getattr(conf, "type", None) == ctype else None
return self._configs.get(name)
class _MessageBase(ServiceBase[TService, NotificationConf]):
@@ -146,13 +147,18 @@ class _MessageBase(ServiceBase[TService, NotificationConf]):
super().__init__()
self._channel: Optional[MessageChannel] = None
def get_configs(self) -> List[NotificationConf]:
def get_configs(self) -> Dict[str, NotificationConf]:
"""
获取消息通知渠道的配置
获取已启用的消息通知渠道的配置字典
:return: 返回消息通知的配置列表
:return: 返回消息通知的配置字典
"""
return ServiceConfigHelper.get_notification_configs()
if self._configs is not None:
return self._configs
configs = ServiceConfigHelper.get_notification_configs()
if not self._service_name:
return {}
return {conf.name: conf for conf in configs if conf.type == self._service_name and conf.enabled}
def check_message(self, message: Notification, source: str = None) -> bool:
"""
@@ -217,13 +223,18 @@ class _DownloaderBase(ServiceBase[TService, DownloaderConf]):
return self._instances.get(name)
return self._default_server
def get_configs(self) -> List[DownloaderConf]:
def get_configs(self) -> Dict[str, DownloaderConf]:
"""
获取下载器的配置
获取已启用的下载器的配置字典
:return: 返回下载器配置列表
:return: 返回下载器配置字典
"""
return ServiceConfigHelper.get_downloader_configs()
if self._configs is not None:
return self._configs
configs = ServiceConfigHelper.get_downloader_configs()
if not self._service_name:
return {}
return {conf.name: conf for conf in configs if conf.type == self._service_name and conf.enabled}
class _MediaServerBase(ServiceBase[TService, MediaServerConf]):
@@ -231,10 +242,15 @@ class _MediaServerBase(ServiceBase[TService, MediaServerConf]):
媒体服务器基类
"""
def get_configs(self) -> List[MediaServerConf]:
def get_configs(self) -> Dict[str, MediaServerConf]:
"""
获取媒体服务器的配置
获取已启用的媒体服务器的配置字典
:return: 返回媒体服务器配置列表
:return: 返回媒体服务器配置字典
"""
return ServiceConfigHelper.get_mediaserver_configs()
if self._configs is not None:
return self._configs
configs = ServiceConfigHelper.get_mediaserver_configs()
if not self._service_name:
return {}
return {conf.name: conf for conf in configs if conf.type == self._service_name and conf.enabled}

View File

@@ -29,9 +29,9 @@ class EmbyModule(_ModuleBase, _MediaServerBase[Emby]):
"""
测试模块连接性
"""
if not self._instances:
if not self.get_instances():
return None
for name, server in self._instances.items():
for name, server in self.get_instances().items():
if server.is_inactive():
server.reconnect()
if not server.get_user():
@@ -46,7 +46,7 @@ class EmbyModule(_ModuleBase, _MediaServerBase[Emby]):
定时任务每10分钟调用一次
"""
# 定时重连
for name, server in self._instances.items():
for name, server in self.get_instances().items():
if server.is_inactive():
logger.info(f"Emby服务器 {name} 连接断开,尝试重连 ...")
server.reconnect()
@@ -59,7 +59,7 @@ class EmbyModule(_ModuleBase, _MediaServerBase[Emby]):
:return: token or None
"""
# Emby认证
for server in self._instances.values():
for server in self.get_instances().values():
result = server.authenticate(name, password)
if result:
return result
@@ -75,18 +75,12 @@ class EmbyModule(_ModuleBase, _MediaServerBase[Emby]):
"""
source = args.get("source")
if source:
server_config: MediaServerConf = self.get_config(source, 'emby')
if not server_config:
return None
server: Emby = self.get_instance(source)
if not server:
return None
return server.get_webhook_message(form, args)
for conf in self._configs.values():
if conf.type != "emby":
continue
server = self.get_instance(conf.name)
for server in self.get_instances().values():
if server:
result = server.get_webhook_message(form, args)
if result:
@@ -100,7 +94,7 @@ class EmbyModule(_ModuleBase, _MediaServerBase[Emby]):
:param itemid: 媒体服务器ItemID
:return: 如不存在返回None存在时返回信息包括每季已存在所有集{type: movie/tv, seasons: {season: [episodes]}}
"""
for name, server in self._instances.items():
for name, server in self.get_instances().items():
if mediainfo.type == MediaType.MOVIE:
if itemid:
movie = server.get_iteminfo(itemid)
@@ -152,7 +146,7 @@ class EmbyModule(_ModuleBase, _MediaServerBase[Emby]):
return None
servers = [server]
else:
servers = self._instances.values()
servers = self.get_instances().values()
media_statistics = []
for server in servers:
media_statistic = server.get_medias_count()

View File

@@ -30,7 +30,7 @@ class JellyfinModule(_ModuleBase, _MediaServerBase[Jellyfin]):
定时任务每10分钟调用一次
"""
# 定时重连
for name, server in self._instances.items():
for name, server in self.get_instances().items():
if server.is_inactive():
logger.info(f"Jellyfin {name} 服务器连接断开,尝试重连 ...")
server.reconnect()
@@ -42,9 +42,9 @@ class JellyfinModule(_ModuleBase, _MediaServerBase[Jellyfin]):
"""
测试模块连接性
"""
if not self._instances:
if not self.get_instances():
return None
for name, server in self._instances.items():
for name, server in self.get_instances().items():
if server.is_inactive():
server.reconnect()
if not server.get_user():
@@ -59,7 +59,7 @@ class JellyfinModule(_ModuleBase, _MediaServerBase[Jellyfin]):
:return: Token or None
"""
# Jellyfin认证
for server in self._instances.values():
for server in self.get_instances().values():
result = server.authenticate(name, password)
if result:
return result
@@ -75,18 +75,12 @@ class JellyfinModule(_ModuleBase, _MediaServerBase[Jellyfin]):
"""
source = args.get("source")
if source:
server_config: MediaServerConf = self.get_config(source, 'jellyfin')
if not server_config:
return None
server: Jellyfin = self.get_instance(source)
if not server:
return None
return server.get_webhook_message(body)
for conf in self._configs.values():
if conf.type != "jellyfin":
continue
server = self.get_instance(conf.name)
for server in self.get_instances().values():
if server:
result = server.get_webhook_message(body)
if result:
@@ -100,7 +94,7 @@ class JellyfinModule(_ModuleBase, _MediaServerBase[Jellyfin]):
:param itemid: 媒体服务器ItemID
:return: 如不存在返回None存在时返回信息包括每季已存在所有集{type: movie/tv, seasons: {season: [episodes]}}
"""
for name, server in self._instances.items():
for name, server in self.get_instances().items():
if mediainfo.type == MediaType.MOVIE:
if itemid:
movie = server.get_iteminfo(itemid)
@@ -150,7 +144,7 @@ class JellyfinModule(_ModuleBase, _MediaServerBase[Jellyfin]):
return None
servers = [server]
else:
servers = self._instances.values()
servers = self.get_instances().values()
media_statistics = []
for server in servers:
media_statistic = server.get_medias_count()

View File

@@ -29,9 +29,9 @@ class PlexModule(_ModuleBase, _MediaServerBase[Plex]):
"""
测试模块连接性
"""
if not self._instances:
if not self.get_instances():
return None
for name, server in self._instances.items():
for name, server in self.get_instances().items():
if server.is_inactive():
server.reconnect()
if not server.get_librarys():
@@ -46,7 +46,7 @@ class PlexModule(_ModuleBase, _MediaServerBase[Plex]):
定时任务每10分钟调用一次
"""
# 定时重连
for name, server in self._instances.items():
for name, server in self.get_instances().items():
if server.is_inactive():
logger.info(f"Plex {name} 服务器连接断开,尝试重连 ...")
server.reconnect()
@@ -61,20 +61,14 @@ class PlexModule(_ModuleBase, _MediaServerBase[Plex]):
"""
source = args.get("source")
if source:
server_config: MediaServerConf = self.get_config(source, 'plex')
if not server_config:
return None
server: Plex = self.get_instance(source)
if not server:
return None
return server.get_webhook_message(body)
return server.get_webhook_message(form)
for conf in self._configs.values():
if conf.type != "plex":
continue
server = self.get_instance(conf.name)
for server in self.get_instances().values():
if server:
result = server.get_webhook_message(body)
result = server.get_webhook_message(form)
if result:
return result
return None
@@ -86,7 +80,7 @@ class PlexModule(_ModuleBase, _MediaServerBase[Plex]):
:param itemid: 媒体服务器ItemID
:return: 如不存在返回None存在时返回信息包括每季已存在所有集{type: movie/tv, seasons: {season: [episodes]}}
"""
for name, server in self._instances.items():
for name, server in self.get_instances().items():
if mediainfo.type == MediaType.MOVIE:
if itemid:
movie = server.get_iteminfo(itemid)
@@ -140,7 +134,7 @@ class PlexModule(_ModuleBase, _MediaServerBase[Plex]):
return None
servers = [server]
else:
servers = self._instances.values()
servers = self.get_instances().values()
media_statistics = []
for server in servers:
media_statistic = server.get_medias_count()

View File

@@ -37,9 +37,9 @@ class QbittorrentModule(_ModuleBase, _DownloaderBase[Qbittorrent]):
"""
测试模块连接性
"""
if not self._instances:
if not self.get_instances():
return None
for name, server in self._instances.items():
for name, server in self.get_instances().items():
if server.is_inactive():
server.reconnect()
if not server.transfer_info():
@@ -53,7 +53,7 @@ class QbittorrentModule(_ModuleBase, _DownloaderBase[Qbittorrent]):
"""
定时任务每10分钟调用一次
"""
for name, server in self._instances.items():
for name, server in self.get_instances().items():
if server.is_inactive():
logger.info(f"Qbittorrent下载器 {name} 连接断开,尝试重连 ...")
server.reconnect()
@@ -337,7 +337,7 @@ class QbittorrentModule(_ModuleBase, _DownloaderBase[Qbittorrent]):
return None
servers = [server]
else:
servers = self._instances.values()
servers = self.get_instances().values()
# 调用Qbittorrent API查询实时信息
ret_info = []
for server in servers:

View File

@@ -27,16 +27,16 @@ class SlackModule(_ModuleBase, _MessageBase[Slack]):
"""
停止模块
"""
for client in self._instances.values():
for client in self.get_instances().values():
client.stop()
def test(self) -> Optional[Tuple[bool, str]]:
"""
测试模块连接性
"""
if not self._instances:
if not self.get_instances():
return None
for name, client in self._instances.items():
for name, client in self.get_instances().items():
state = client.get_state()
if not state:
return False, f"Slack {name} 未就续"
@@ -168,7 +168,7 @@ class SlackModule(_ModuleBase, _MessageBase[Slack]):
}
"""
# 获取客户端
client_config = self.get_config(source, 'slack')
client_config = self.get_config(source)
if not client_config:
return None
# 校验token
@@ -214,7 +214,7 @@ class SlackModule(_ModuleBase, _MessageBase[Slack]):
:param message: 消息
:return: 成功或失败
"""
for conf in self._configs.values():
for conf in self.get_configs().values():
if not self.check_message(message, conf.name):
continue
targets = message.targets
@@ -236,7 +236,7 @@ class SlackModule(_ModuleBase, _MessageBase[Slack]):
:param medias: 媒体信息
:return: 成功或失败
"""
for conf in self._configs.values():
for conf in self.get_configs().values():
if not self.check_message(message, conf.name):
continue
client: Slack = self.get_instance(conf.name)
@@ -250,7 +250,7 @@ class SlackModule(_ModuleBase, _MessageBase[Slack]):
:param torrents: 种子信息
:return: 成功或失败
"""
for conf in self._configs.values():
for conf in self.get_configs().values():
if not self.check_message(message, conf.name):
continue
client: Slack = self.get_instance(conf.name)

View File

@@ -27,9 +27,9 @@ class SynologyChatModule(_ModuleBase, _MessageBase[SynologyChat]):
"""
测试模块连接性
"""
if not self._instances:
if not self.get_instances():
return None
for name, client in self._instances.items():
for name, client in self.get_instances().items():
state = client.get_state()
if not state:
return False, f"Synology Chat {name} 未就续"
@@ -53,10 +53,12 @@ class SynologyChatModule(_ModuleBase, _MessageBase[SynologyChat]):
"""
try:
# 来源
client_config = self.get_config(source, 'synologychat')
client_config = self.get_config(source)
if not client_config:
return None
client: SynologyChat = self.get_instance(source)
if not client:
return None
# 解析消息
message: dict = form
if not message:
@@ -85,7 +87,7 @@ class SynologyChatModule(_ModuleBase, _MessageBase[SynologyChat]):
:param message: 消息体
:return: 成功或失败
"""
for conf in self._configs.values():
for conf in self.get_configs().values():
if not self.check_message(message, conf.name):
continue
targets = message.targets
@@ -107,7 +109,7 @@ class SynologyChatModule(_ModuleBase, _MessageBase[SynologyChat]):
:param medias: 媒体列表
:return: 成功或失败
"""
for conf in self._configs.values():
for conf in self.get_configs().values():
if not self.check_message(message, conf.name):
continue
client: SynologyChat = self.get_instance(conf.name)
@@ -122,7 +124,7 @@ class SynologyChatModule(_ModuleBase, _MessageBase[SynologyChat]):
:param torrents: 种子列表
:return: 成功或失败
"""
for conf in self._configs.values():
for conf in self.get_configs().values():
if not self.check_message(message, conf.name):
continue
client: SynologyChat = self.get_instance(conf.name)

View File

@@ -26,16 +26,16 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
"""
停止模块
"""
for client in self._instances.values():
for client in self.get_instances().values():
client.stop()
def test(self) -> Optional[Tuple[bool, str]]:
"""
测试模块连接性
"""
if not self._instances:
if not self.get_instances():
return None
for name, client in self._instances.items():
for name, client in self.get_instances().items():
state = client.get_state()
if not state:
return False, f"Telegram {name} 未就续"
@@ -81,7 +81,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
}
"""
# 获取渠道
client_config = self.get_config(source, 'telegram')
client_config = self.get_config(source)
if not client_config:
return None
client: Telegram = self.get_instance(source)
@@ -127,7 +127,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
:param message: 消息体
:return: 成功或失败
"""
for conf in self._configs.values():
for conf in self.get_configs().values():
if not self.check_message(message, conf.name):
continue
targets = message.targets
@@ -149,7 +149,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
:param medias: 媒体列表
:return: 成功或失败
"""
for conf in self._configs.values():
for conf in self.get_configs().values():
if not self.check_message(message, conf.name):
continue
client: Telegram = self.get_instance(conf.name)
@@ -164,7 +164,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
:param torrents: 种子列表
:return: 成功或失败
"""
for conf in self._configs.values():
for conf in self.get_configs().values():
if not self.check_message(message, conf.name):
continue
client: Telegram = self.get_instance(conf.name)
@@ -177,5 +177,5 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
注册命令,实现这个函数接收系统可用的命令菜单
:param commands: 命令字典
"""
for client in self._instances.values():
for client in self.get_instances().values():
client.register_commands(commands)

View File

@@ -37,9 +37,9 @@ class TransmissionModule(_ModuleBase, _DownloaderBase[Transmission]):
"""
测试模块连接性
"""
if not self._instances:
if not self.get_instances():
return None
for name, server in self._instances.items():
for name, server in self.get_instances().items():
if server.is_inactive():
server.reconnect()
if not server.transfer_info():
@@ -54,7 +54,7 @@ class TransmissionModule(_ModuleBase, _DownloaderBase[Transmission]):
定时任务每10分钟调用一次
"""
# 定时重连
for name, server in self._instances.items():
for name, server in self.get_instances().items():
if server.is_inactive():
logger.info(f"Transmission下载器 {name} 连接断开,尝试重连 ...")
server.reconnect()
@@ -337,7 +337,7 @@ class TransmissionModule(_ModuleBase, _DownloaderBase[Transmission]):
return None
servers = [server]
else:
servers = self._instances.values()
servers = self.get_instances().values()
# 调用Qbittorrent API查询实时信息
ret_info = []
for server in servers:

View File

@@ -29,9 +29,9 @@ class VoceChatModule(_ModuleBase, _MessageBase[VoceChat]):
"""
测试模块连接性
"""
if not self._instances:
if not self.get_instances():
return None
for name, client in self._instances.items():
for name, client in self.get_instances().items():
state = client.get_state()
if not state:
return False, f"VoceChat {name} 未就续"
@@ -70,7 +70,7 @@ class VoceChatModule(_ModuleBase, _MessageBase[VoceChat]):
}
"""
# 获取渠道
client_config = self.get_config(source, 'vocechat')
client_config = self.get_config(source)
if not client_config:
return None
# 报文体
@@ -113,7 +113,7 @@ class VoceChatModule(_ModuleBase, _MessageBase[VoceChat]):
:param message: 消息内容
:return: 成功或失败
"""
for conf in self._configs.values():
for conf in self.get_configs().values():
if not self.check_message(message, conf.name):
continue
targets = message.targets
@@ -132,7 +132,7 @@ class VoceChatModule(_ModuleBase, _MessageBase[VoceChat]):
:param medias: 媒体列表
:return: 成功或失败
"""
for conf in self._configs.values():
for conf in self.get_configs().values():
if not self.check_message(message, conf.name):
continue
client: VoceChat = self.get_instance(conf.name)
@@ -148,7 +148,7 @@ class VoceChatModule(_ModuleBase, _MessageBase[VoceChat]):
:param torrents: 种子列表
:return: 成功或失败
"""
for conf in self._configs.values():
for conf in self.get_configs().values():
if not self.check_message(message, conf.name):
continue
targets = message.targets

View File

@@ -39,7 +39,7 @@ class WebPushModule(_ModuleBase, _MessageBase):
:param message: 消息内容
:return: 成功或失败
"""
for conf in self._configs.values():
for conf in self.get_configs().values():
if not self.check_message(message, conf.name):
continue
webpush_users = conf.config.get("WEBPUSH_USERNAME") or ""

View File

@@ -30,9 +30,9 @@ class WechatModule(_ModuleBase, _MessageBase[WeChat]):
"""
测试模块连接性
"""
if not self._instances:
if not self.get_instances():
return None
for name, client in self._instances.items():
for name, client in self.get_instances().items():
state = client.get_state()
if not state:
return False, f"企业微信 {name} 未就续"
@@ -56,7 +56,7 @@ class WechatModule(_ModuleBase, _MessageBase[WeChat]):
"""
try:
# 获取客户端
client_config = self.get_config(source, 'wechat')
client_config = self.get_config(source)
if not client_config:
return None
client: WeChat = self.get_instance(source)
@@ -150,7 +150,7 @@ class WechatModule(_ModuleBase, _MessageBase[WeChat]):
:param message: 消息内容
:return: 成功或失败
"""
for conf in self._configs.values():
for conf in self.get_configs().values():
if not self.check_message(message, conf.name):
continue
targets = message.targets
@@ -172,7 +172,7 @@ class WechatModule(_ModuleBase, _MessageBase[WeChat]):
:param medias: 媒体列表
:return: 成功或失败
"""
for conf in self._configs.values():
for conf in self.get_configs().values():
if not self.check_message(message, conf.name):
continue
client: WeChat = self.get_instance(conf.name)
@@ -189,7 +189,7 @@ class WechatModule(_ModuleBase, _MessageBase[WeChat]):
:param torrents: 种子列表
:return: 成功或失败
"""
for conf in self._configs.values():
for conf in self.get_configs().values():
if not self.check_message(message, conf.name):
continue
client: WeChat = self.get_instance(conf.name)
@@ -202,5 +202,5 @@ class WechatModule(_ModuleBase, _MessageBase[WeChat]):
注册命令,实现这个函数接收系统可用的命令菜单
:param commands: 命令字典
"""
for client in self._instances.values():
for client in self.get_instances().values():
client.create_menus(commands)