diff --git a/app/api/endpoints/login.py b/app/api/endpoints/login.py index 26b181ed..702fea26 100644 --- a/app/api/endpoints/login.py +++ b/app/api/endpoints/login.py @@ -5,9 +5,7 @@ from fastapi import APIRouter, Depends, Form, HTTPException from fastapi.security import OAuth2PasswordRequestForm from app import schemas -from app.chain.tmdb import TmdbChain from app.chain.user import UserChain -from app.chain.mediaserver import MediaServerChain from app.core import security from app.core.config import settings from app.helper.sites import SitesHelper @@ -54,14 +52,7 @@ def wallpaper() -> Any: """ 获取登录页面电影海报 """ - if settings.WALLPAPER == "bing": - url = WallpaperHelper().get_bing_wallpaper() - elif settings.WALLPAPER == "mediaserver": - url = MediaServerChain().get_latest_wallpaper() - elif settings.WALLPAPER == "customize": - url = WallpaperHelper().get_customize_wallpaper() - else: - url = TmdbChain().get_random_wallpager() + url = WallpaperHelper().get_wallpaper() if url: return schemas.Response( success=True, @@ -75,13 +66,4 @@ def wallpapers() -> Any: """ 获取登录页面电影海报 """ - if settings.WALLPAPER == "bing": - return WallpaperHelper().get_bing_wallpapers() - elif settings.WALLPAPER == "mediaserver": - return MediaServerChain().get_latest_wallpapers() - elif settings.WALLPAPER == "tmdb": - return TmdbChain().get_trending_wallpapers() - elif settings.WALLPAPER == "customize": - return WallpaperHelper().get_customize_wallpapers() - else: - return [] + return WallpaperHelper().get_wallpaper() diff --git a/app/chain/__init__.py b/app/chain/__init__.py index 317e5ad3..4cda6a46 100644 --- a/app/chain/__init__.py +++ b/app/chain/__init__.py @@ -43,7 +43,6 @@ class ChainBase(metaclass=ABCMeta): self.messagequeue = MessageQueueManager( send_callback=self.run_module ) - self.useroper = UserOper() self.pluginmanager = PluginManager() @staticmethod @@ -575,26 +574,27 @@ class ChainBase(metaclass=ABCMeta): # 是否已发送管理员标志 admin_sended = False send_orignal = False + useroper = UserOper() for action in actions: send_message = copy.deepcopy(message) if action == "admin" and not admin_sended: # 仅发送管理员 logger.info(f"{send_message.mtype} 的消息已设置发送给管理员") # 读取管理员消息IDS - send_message.targets = self.useroper.get_settings(settings.SUPERUSER) + send_message.targets = useroper.get_settings(settings.SUPERUSER) admin_sended = True elif action == "user" and send_message.username: # 发送对应用户 logger.info(f"{send_message.mtype} 的消息已设置发送给用户 {send_message.username}") # 读取用户消息IDS - send_message.targets = self.useroper.get_settings(send_message.username) + send_message.targets = useroper.get_settings(send_message.username) if send_message.targets is None: # 没有找到用户 if not admin_sended: # 回滚发送管理员 logger.info(f"用户 {send_message.username} 不存在,消息将发送给管理员") # 读取管理员消息IDS - send_message.targets = self.useroper.get_settings(settings.SUPERUSER) + send_message.targets = useroper.get_settings(settings.SUPERUSER) admin_sended = True else: # 管理员发过了,此消息不发了 diff --git a/app/chain/bangumi.py b/app/chain/bangumi.py index 3da261f7..8c6e8f85 100644 --- a/app/chain/bangumi.py +++ b/app/chain/bangumi.py @@ -3,12 +3,11 @@ from typing import Optional, List from app import schemas from app.chain import ChainBase from app.core.context import MediaInfo -from app.utils.singleton import Singleton -class BangumiChain(ChainBase, metaclass=Singleton): +class BangumiChain(ChainBase): """ - Bangumi处理链,单例运行 + Bangumi处理链 """ def calendar(self) -> Optional[List[MediaInfo]]: diff --git a/app/chain/dashboard.py b/app/chain/dashboard.py index e0a945ef..06b292b3 100644 --- a/app/chain/dashboard.py +++ b/app/chain/dashboard.py @@ -2,10 +2,9 @@ from typing import Optional, List from app import schemas from app.chain import ChainBase -from app.utils.singleton import Singleton -class DashboardChain(ChainBase, metaclass=Singleton): +class DashboardChain(ChainBase): """ 各类仪表板统计处理链 """ diff --git a/app/chain/douban.py b/app/chain/douban.py index cb0da130..f8b0bd6c 100644 --- a/app/chain/douban.py +++ b/app/chain/douban.py @@ -4,12 +4,11 @@ from app import schemas from app.chain import ChainBase from app.core.context import MediaInfo from app.schemas import MediaType -from app.utils.singleton import Singleton -class DoubanChain(ChainBase, metaclass=Singleton): +class DoubanChain(ChainBase): """ - 豆瓣处理链,单例运行 + 豆瓣处理链 """ def person_detail(self, person_id: int) -> Optional[schemas.MediaPerson]: diff --git a/app/chain/download.py b/app/chain/download.py index 825b2a32..3d5404c6 100644 --- a/app/chain/download.py +++ b/app/chain/download.py @@ -16,7 +16,6 @@ from app.core.metainfo import MetaInfo from app.db.downloadhistory_oper import DownloadHistoryOper from app.db.mediaserver_oper import MediaServerOper from app.helper.directory import DirectoryHelper -from app.helper.message import MessageHelper from app.helper.torrent import TorrentHelper from app.log import logger from app.schemas import ExistMediaInfo, NotExistMediaInfo, DownloadingTorrent, Notification, ResourceSelectionEventData, \ @@ -32,14 +31,6 @@ class DownloadChain(ChainBase): 下载处理链 """ - def __init__(self): - super().__init__() - self.torrent = TorrentHelper() - self.downloadhis = DownloadHistoryOper() - self.mediaserver = MediaServerOper() - self.directoryhelper = DirectoryHelper() - self.messagehelper = MessageHelper() - def download_torrent(self, torrent: TorrentInfo, channel: MessageChannel = None, source: Optional[str] = None, @@ -122,7 +113,7 @@ class DownloadChain(ChainBase): logger.error(f"{torrent.title} 无法获取下载地址:{torrent.enclosure}!") return None, "", [] # 下载种子文件 - torrent_file, content, download_folder, files, error_msg = self.torrent.download_torrent( + torrent_file, content, download_folder, files, error_msg = TorrentHelper().download_torrent( url=torrent_url, cookie=site_cookie, ua=torrent.site_ua or settings.USER_AGENT, @@ -220,7 +211,7 @@ class DownloadChain(ChainBase): else: content = torrent_file # 获取种子文件的文件夹名和文件清单 - _folder_name, _file_list = self.torrent.get_torrent_info(torrent_file) + _folder_name, _file_list = TorrentHelper().get_torrent_info(torrent_file) # 下载目录 if save_path: @@ -228,7 +219,7 @@ class DownloadChain(ChainBase): download_dir = Path(save_path) else: # 根据媒体信息查询下载目录配置 - dir_info = self.directoryhelper.get_dir(_media, storage="local", include_unsorted=True) + dir_info = DirectoryHelper().get_dir(_media, storage="local", include_unsorted=True) # 拼装子目录 if dir_info: # 一级目录 @@ -278,7 +269,8 @@ class DownloadChain(ChainBase): _save_path = download_dir if _layout == "NoSubfolder" or not _folder_name else download_path # 登记下载记录 - self.downloadhis.add( + downloadhis = DownloadHistoryOper() + downloadhis.add( path=str(download_path), type=_media.type.value, title=_media.title, @@ -326,7 +318,7 @@ class DownloadChain(ChainBase): "torrentname": _meta.org_string, }) if files_to_add: - self.downloadhis.add_files(files_to_add) + downloadhis.add_files(files_to_add) # 下载成功发送消息 self.post_message( @@ -553,7 +545,7 @@ class DownloadChain(ChainBase): if isinstance(content, str): logger.warn(f"{meta.org_string} 下载地址是磁力链,无法确定种子文件集数") continue - torrent_episodes = self.torrent.get_torrent_episodes(torrent_files) + torrent_episodes = TorrentHelper().get_torrent_episodes(torrent_files) logger.info(f"{meta.org_string} 解析种子文件集数为 {torrent_episodes}") if not torrent_episodes: continue @@ -759,7 +751,7 @@ class DownloadChain(ChainBase): logger.warn(f"{meta.org_string} 下载地址是磁力链,无法解析种子文件集数") continue # 种子全部集 - torrent_episodes = self.torrent.get_torrent_episodes(torrent_files) + torrent_episodes = TorrentHelper().get_torrent_episodes(torrent_files) logger.info(f"{torrent.site_name} - {meta.org_string} 解析种子文件集数:{torrent_episodes}") # 选中的集 selected_episodes = set(torrent_episodes).intersection(set(need_episodes)) @@ -848,11 +840,13 @@ class DownloadChain(ChainBase): if not totals: totals = {} + mediaserver = MediaServerOper() + if mediainfo.type == MediaType.MOVIE: # 电影 - itemid = self.mediaserver.get_item_id(mtype=mediainfo.type.value, - title=mediainfo.title, - tmdbid=mediainfo.tmdb_id) + itemid = mediaserver.get_item_id(mtype=mediainfo.type.value, + title=mediainfo.title, + tmdbid=mediainfo.tmdb_id) exists_movies: Optional[ExistMediaInfo] = self.media_exists(mediainfo=mediainfo, itemid=itemid) if exists_movies: logger.info(f"媒体库中已存在电影:{mediainfo.title_year}") @@ -872,10 +866,10 @@ class DownloadChain(ChainBase): logger.error(f"媒体信息中没有季集信息:{mediainfo.title_year}") return False, {} # 电视剧 - itemid = self.mediaserver.get_item_id(mtype=mediainfo.type.value, - title=mediainfo.title, - tmdbid=mediainfo.tmdb_id, - season=mediainfo.season) + itemid = mediaserver.get_item_id(mtype=mediainfo.type.value, + title=mediainfo.title, + tmdbid=mediainfo.tmdb_id, + season=mediainfo.season) # 媒体库已存在的剧集 exists_tvs: Optional[ExistMediaInfo] = self.media_exists(mediainfo=mediainfo, itemid=itemid) if not exists_tvs: @@ -974,7 +968,7 @@ class DownloadChain(ChainBase): return [] ret_torrents = [] for torrent in torrents: - history = self.downloadhis.get_by_hash(torrent.hash) + history = DownloadHistoryOper().get_by_hash(torrent.hash) if history: # 媒体信息 torrent.media = { diff --git a/app/chain/media.py b/app/chain/media.py index b4a3ef7a..fdd54cec 100644 --- a/app/chain/media.py +++ b/app/chain/media.py @@ -14,7 +14,6 @@ from app.log import logger from app.schemas import FileItem from app.schemas.types import EventType, MediaType, ChainEventType from app.utils.http import RequestUtils -from app.utils.singleton import Singleton from app.utils.string import StringUtils recognize_lock = Lock() @@ -22,15 +21,11 @@ scraping_lock = Lock() scraping_files = [] -class MediaChain(ChainBase, metaclass=Singleton): +class MediaChain(ChainBase): """ 媒体信息处理链,单例运行 """ - def __init__(self): - super().__init__() - self.storagechain = StorageChain() - def metadata_nfo(self, meta: MetaBase, mediainfo: MediaInfo, season: Optional[int] = None, episode: Optional[int] = None) -> Optional[str]: """ @@ -337,6 +332,8 @@ class MediaChain(ChainBase, metaclass=Singleton): :param overwrite: 是否覆盖已有文件 """ + storagechain = StorageChain() + def is_bluray_folder(_fileitem: schemas.FileItem) -> bool: """ 判断是否为原盘目录 @@ -346,7 +343,7 @@ class MediaChain(ChainBase, metaclass=Singleton): # 蓝光原盘目录必备的文件或文件夹 required_files = ['BDMV', 'CERTIFICATE'] # 检查目录下是否存在所需文件或文件夹 - for item in self.storagechain.list_files(_fileitem): + for item in storagechain.list_files(_fileitem): if item.name in required_files: return True return False @@ -355,7 +352,7 @@ class MediaChain(ChainBase, metaclass=Singleton): """ 列出下级文件 """ - return self.storagechain.list_files(fileitem=_fileitem) + return storagechain.list_files(fileitem=_fileitem) def __save_file(_fileitem: schemas.FileItem, _path: Path, _content: Union[bytes, str]): """ @@ -371,7 +368,7 @@ class MediaChain(ChainBase, metaclass=Singleton): tmp_file.write_bytes(_content) # 获取文件的父目录 try: - item = self.storagechain.upload_file(fileitem=_fileitem, path=tmp_file, new_name=_path.name) + item = storagechain.upload_file(fileitem=_fileitem, path=tmp_file, new_name=_path.name) if item: logger.info(f"已保存文件:{item.path}") else: @@ -413,7 +410,7 @@ class MediaChain(ChainBase, metaclass=Singleton): if fileitem.type == "file": # 是否已存在 nfo_path = filepath.with_suffix(".nfo") - if overwrite or not self.storagechain.get_file_item(storage=fileitem.storage, path=nfo_path): + if overwrite or not storagechain.get_file_item(storage=fileitem.storage, path=nfo_path): # 电影文件 movie_nfo = self.metadata_nfo(meta=meta, mediainfo=mediainfo) if movie_nfo: @@ -428,7 +425,7 @@ class MediaChain(ChainBase, metaclass=Singleton): if is_bluray_folder(fileitem): # 原盘目录 nfo_path = filepath / (filepath.name + ".nfo") - if overwrite or not self.storagechain.get_file_item(storage=fileitem.storage, path=nfo_path): + if overwrite or not storagechain.get_file_item(storage=fileitem.storage, path=nfo_path): # 生成原盘nfo movie_nfo = self.metadata_nfo(meta=meta, mediainfo=mediainfo) if movie_nfo: @@ -453,8 +450,8 @@ class MediaChain(ChainBase, metaclass=Singleton): if image_dict: for image_name, image_url in image_dict.items(): image_path = filepath.with_name(image_name) - if overwrite or not self.storagechain.get_file_item(storage=fileitem.storage, - path=image_path): + if overwrite or not storagechain.get_file_item(storage=fileitem.storage, + path=image_path): # 下载图片 content = __download_image(image_url) # 写入图片到当前目录 @@ -477,7 +474,7 @@ class MediaChain(ChainBase, metaclass=Singleton): return # 是否已存在 nfo_path = filepath.with_suffix(".nfo") - if overwrite or not self.storagechain.get_file_item(storage=fileitem.storage, path=nfo_path): + if overwrite or not storagechain.get_file_item(storage=fileitem.storage, path=nfo_path): # 获取集的nfo文件 episode_nfo = self.metadata_nfo(meta=file_meta, mediainfo=file_mediainfo, season=file_meta.begin_season, @@ -485,7 +482,7 @@ class MediaChain(ChainBase, metaclass=Singleton): if episode_nfo: # 保存或上传nfo文件到上级目录 if not parent: - parent = self.storagechain.get_parent_item(fileitem) + parent = storagechain.get_parent_item(fileitem) __save_file(_fileitem=parent, _path=nfo_path, _content=episode_nfo) else: logger.warn(f"{filepath.name} nfo文件生成失败!") @@ -497,13 +494,13 @@ class MediaChain(ChainBase, metaclass=Singleton): if image_dict: for episode, image_url in image_dict.items(): image_path = filepath.with_suffix(Path(image_url).suffix) - if overwrite or not self.storagechain.get_file_item(storage=fileitem.storage, path=image_path): + if overwrite or not storagechain.get_file_item(storage=fileitem.storage, path=image_path): # 下载图片 content = __download_image(image_url) # 保存图片文件到当前目录 if content: if not parent: - parent = self.storagechain.get_parent_item(fileitem) + parent = storagechain.get_parent_item(fileitem) __save_file(_fileitem=parent, _path=image_path, _content=content) else: logger.info(f"已存在图片文件:{image_path}") @@ -526,7 +523,7 @@ class MediaChain(ChainBase, metaclass=Singleton): if season_meta.begin_season is not None: # 是否已存在 nfo_path = filepath / "season.nfo" - if overwrite or not self.storagechain.get_file_item(storage=fileitem.storage, path=nfo_path): + if overwrite or not storagechain.get_file_item(storage=fileitem.storage, path=nfo_path): # 当前目录有季号,生成季nfo season_nfo = self.metadata_nfo(meta=meta, mediainfo=mediainfo, season=season_meta.begin_season) @@ -542,14 +539,14 @@ class MediaChain(ChainBase, metaclass=Singleton): if image_dict: for image_name, image_url in image_dict.items(): image_path = filepath.with_name(image_name) - if overwrite or not self.storagechain.get_file_item(storage=fileitem.storage, - path=image_path): + if overwrite or not storagechain.get_file_item(storage=fileitem.storage, + path=image_path): # 下载图片 content = __download_image(image_url) # 保存图片文件到剧集目录 if content: if not parent: - parent = self.storagechain.get_parent_item(fileitem) + parent = storagechain.get_parent_item(fileitem) __save_file(_fileitem=parent, _path=image_path, _content=content) else: logger.info(f"已存在图片文件:{image_path}") @@ -564,14 +561,14 @@ class MediaChain(ChainBase, metaclass=Singleton): if image_season != str(season_meta.begin_season).rjust(2, '0'): logger.info(f"当前刮削季为:{season_meta.begin_season},跳过文件:{image_path}") continue - if overwrite or not self.storagechain.get_file_item(storage=fileitem.storage, - path=image_path): + if overwrite or not storagechain.get_file_item(storage=fileitem.storage, + path=image_path): # 下载图片 content = __download_image(image_url) # 保存图片文件到当前目录 if content: if not parent: - parent = self.storagechain.get_parent_item(fileitem) + parent = storagechain.get_parent_item(fileitem) __save_file(_fileitem=parent, _path=image_path, _content=content) else: logger.info(f"已存在图片文件:{image_path}") @@ -579,7 +576,7 @@ class MediaChain(ChainBase, metaclass=Singleton): if not season_meta.season: # 是否已存在 nfo_path = filepath / "tvshow.nfo" - if overwrite or not self.storagechain.get_file_item(storage=fileitem.storage, path=nfo_path): + if overwrite or not storagechain.get_file_item(storage=fileitem.storage, path=nfo_path): # 当前目录有名称,生成tvshow nfo 和 tv图片 tv_nfo = self.metadata_nfo(meta=meta, mediainfo=mediainfo) if tv_nfo: @@ -597,8 +594,8 @@ class MediaChain(ChainBase, metaclass=Singleton): if image_name.startswith("season"): continue image_path = filepath / image_name - if overwrite or not self.storagechain.get_file_item(storage=fileitem.storage, - path=image_path): + if overwrite or not storagechain.get_file_item(storage=fileitem.storage, + path=image_path): # 下载图片 content = __download_image(image_url) # 保存图片文件到当前目录 diff --git a/app/chain/mediaserver.py b/app/chain/mediaserver.py index 55689264..5c31ca40 100644 --- a/app/chain/mediaserver.py +++ b/app/chain/mediaserver.py @@ -17,10 +17,6 @@ class MediaServerChain(ChainBase): 媒体服务器处理链 """ - def __init__(self): - super().__init__() - self.dboper = MediaServerOper() - def librarys(self, server: str, username: Optional[str] = None, hidden: bool = False) -> List[MediaServerLibrary]: """ @@ -131,7 +127,8 @@ class MediaServerChain(ChainBase): # 汇总统计 total_count = 0 # 清空登记薄 - self.dboper.empty() + dboper = MediaServerOper() + dboper.empty() # 遍历媒体服务器 for mediaserver in mediaservers: if not mediaserver: @@ -175,7 +172,7 @@ class MediaServerChain(ChainBase): item_dict = item.dict() item_dict["seasoninfo"] = seasoninfo item_dict["item_type"] = item_type - self.dboper.add(**item_dict) + dboper.add(**item_dict) logger.info(f"{server_name} 媒体库 {library.name} 同步完成,共同步数量:{library_count}") # 总数累加 total_count += library_count diff --git a/app/chain/message.py b/app/chain/message.py index e2f58402..10276afc 100644 --- a/app/chain/message.py +++ b/app/chain/message.py @@ -9,10 +9,8 @@ from app.chain.search import SearchChain from app.chain.subscribe import SubscribeChain from app.core.config import settings from app.core.context import MediaInfo, Context -from app.core.event import EventManager from app.core.meta import MetaBase -from app.db.message_oper import MessageOper -from app.helper.message import MessageHelper +from app.db.user_oper import UserOper from app.helper.torrent import TorrentHelper from app.log import logger from app.schemas import Notification, NotExistMediaInfo, CommingMessage @@ -36,19 +34,8 @@ class MessageChain(ChainBase): # 每页数据量 _page_size: int = 8 - def __init__(self): - super().__init__() - self.downloadchain = DownloadChain() - self.subscribechain = SubscribeChain() - self.searchchain = SearchChain() - self.mediachain = MediaChain() - self.eventmanager = EventManager() - self.torrenthelper = TorrentHelper() - self.messagehelper = MessageHelper() - self.messageoper = MessageOper() - + @staticmethod def __get_noexits_info( - self, _meta: MetaBase, _mediainfo: MediaInfo) -> Dict[Union[int, str], Dict[int, NotExistMediaInfo]]: """ @@ -57,10 +44,10 @@ class MessageChain(ChainBase): if _mediainfo.type == MediaType.TV: if not _mediainfo.seasons: # 补充媒体信息 - _mediainfo = self.mediachain.recognize_media(mtype=_mediainfo.type, - tmdbid=_mediainfo.tmdb_id, - doubanid=_mediainfo.douban_id, - cache=False) + _mediainfo = MediaChain().recognize_media(mtype=_mediainfo.type, + tmdbid=_mediainfo.tmdb_id, + doubanid=_mediainfo.douban_id, + cache=False) if not _mediainfo: logger.warn(f"{_mediainfo.tmdb_id or _mediainfo.douban_id} 媒体信息识别失败!") return {} @@ -193,8 +180,8 @@ class MessageChain(ChainBase): mediainfo: MediaInfo = cache_list[_choice] _current_media = mediainfo # 查询缺失的媒体信息 - exist_flag, no_exists = self.downloadchain.get_no_exists_info(meta=_current_meta, - mediainfo=_current_media) + exist_flag, no_exists = DownloadChain().get_no_exists_info(meta=_current_meta, + mediainfo=_current_media) if exist_flag and cache_type == "Search": # 媒体库中已存在 self.post_message( @@ -234,8 +221,8 @@ class MessageChain(ChainBase): title=f"开始搜索 {mediainfo.type.value} {mediainfo.title_year} ...", userid=userid)) # 开始搜索 - contexts = self.searchchain.process(mediainfo=mediainfo, - no_exists=no_exists) + contexts = SearchChain().process(mediainfo=mediainfo, + no_exists=no_exists) if not contexts: # 没有数据 self.post_message(Notification( @@ -246,7 +233,7 @@ class MessageChain(ChainBase): userid=userid)) return # 搜索结果排序 - contexts = self.torrenthelper.sort_torrents(contexts) + contexts = TorrentHelper().sort_torrents(contexts) # 判断是否设置自动下载 auto_download_user = settings.AUTO_DOWNLOAD_USER # 匹配到自动下载用户 @@ -287,8 +274,8 @@ class MessageChain(ChainBase): best_version = False # 查询缺失的媒体信息 if cache_type == "Subscribe": - exist_flag, _ = self.downloadchain.get_no_exists_info(meta=_current_meta, - mediainfo=mediainfo) + exist_flag, _ = DownloadChain().get_no_exists_info(meta=_current_meta, + mediainfo=mediainfo) if exist_flag: self.post_message(Notification( channel=channel, @@ -300,18 +287,18 @@ class MessageChain(ChainBase): else: best_version = True # 转换用户名 - mp_name = self.useroper.get_name(**{f"{channel.name.lower()}_userid": userid}) if channel else None + mp_name = UserOper().get_name(**{f"{channel.name.lower()}_userid": userid}) if channel else None # 添加订阅,状态为N - self.subscribechain.add(title=mediainfo.title, - year=mediainfo.year, - mtype=mediainfo.type, - tmdbid=mediainfo.tmdb_id, - season=_current_meta.begin_season, - channel=channel, - source=source, - userid=userid, - username=mp_name or username, - best_version=best_version) + SubscribeChain().add(title=mediainfo.title, + year=mediainfo.year, + mtype=mediainfo.type, + tmdbid=mediainfo.tmdb_id, + season=_current_meta.begin_season, + channel=channel, + source=source, + userid=userid, + username=mp_name or username, + best_version=best_version) elif cache_type == "Torrent": if int(text) == 0: # 自动选择下载,强制下载模式 @@ -324,8 +311,8 @@ class MessageChain(ChainBase): # 下载种子 context: Context = cache_list[_choice] # 下载 - self.downloadchain.download_single(context, channel=channel, source=source, - userid=userid, username=username) + DownloadChain().download_single(context, channel=channel, source=source, + userid=userid, username=username) elif text.lower() == "p": # 上一页 @@ -444,7 +431,7 @@ class MessageChain(ChainBase): if action in ["Search", "ReSearch", "Subscribe", "ReSubscribe"]: # 搜索 - meta, medias = self.mediachain.search(content) + meta, medias = MediaChain().search(content) # 识别 if not meta.name: self.post_message(Notification( @@ -497,9 +484,10 @@ class MessageChain(ChainBase): """ 自动择优下载 """ + downloadchain = DownloadChain() if no_exists is None: # 查询缺失的媒体信息 - exist_flag, no_exists = self.downloadchain.get_no_exists_info( + exist_flag, no_exists = downloadchain.get_no_exists_info( meta=_current_meta, mediainfo=_current_media ) @@ -508,12 +496,12 @@ class MessageChain(ChainBase): no_exists = self.__get_noexits_info(_current_meta, _current_media) # 批量下载 - downloads, lefts = self.downloadchain.batch_download(contexts=cache_list, - no_exists=no_exists, - channel=channel, - source=source, - userid=userid, - username=username) + downloads, lefts = downloadchain.batch_download(contexts=cache_list, + no_exists=no_exists, + channel=channel, + source=source, + userid=userid, + username=username) if downloads and not lefts: # 全部下载完成 logger.info(f'{_current_media.title_year} 下载完成') @@ -528,19 +516,19 @@ class MessageChain(ChainBase): else: note = None # 转换用户名 - mp_name = self.useroper.get_name(**{f"{channel.name.lower()}_userid": userid}) if channel else None + mp_name = UserOper().get_name(**{f"{channel.name.lower()}_userid": userid}) if channel else None # 添加订阅,状态为R - self.subscribechain.add(title=_current_media.title, - year=_current_media.year, - mtype=_current_media.type, - tmdbid=_current_media.tmdb_id, - season=_current_meta.begin_season, - channel=channel, - source=source, - userid=userid, - username=mp_name or username, - state="R", - note=note) + SubscribeChain().add(title=_current_media.title, + year=_current_media.year, + mtype=_current_media.type, + tmdbid=_current_media.tmdb_id, + season=_current_meta.begin_season, + channel=channel, + source=source, + userid=userid, + username=mp_name or username, + state="R", + note=note) def __post_medias_message(self, channel: MessageChannel, source: str, title: str, items: list, userid: str, total: int): diff --git a/app/chain/recommend.py b/app/chain/recommend.py index 7d2241f1..f5d59fc0 100644 --- a/app/chain/recommend.py +++ b/app/chain/recommend.py @@ -29,12 +29,8 @@ class RecommendChain(ChainBase, metaclass=Singleton): 推荐处理链,单例运行 """ - def __init__(self): - super().__init__() - self.tmdbchain = TmdbChain() - self.doubanchain = DoubanChain() - self.bangumichain = BangumiChain() - self.cache_max_pages = 5 + # 推荐数据的缓存页数 + cache_max_pages = 5 def refresh_recommend(self): """ @@ -174,16 +170,16 @@ class RecommendChain(ChainBase, metaclass=Singleton): """ TMDB热门电影 """ - movies = self.tmdbchain.tmdb_discover(mtype=MediaType.MOVIE, - sort_by=sort_by, - with_genres=with_genres, - with_original_language=with_original_language, - with_keywords=with_keywords, - with_watch_providers=with_watch_providers, - vote_average=vote_average, - vote_count=vote_count, - release_date=release_date, - page=page) + movies = TmdbChain().tmdb_discover(mtype=MediaType.MOVIE, + sort_by=sort_by, + with_genres=with_genres, + with_original_language=with_original_language, + with_keywords=with_keywords, + with_watch_providers=with_watch_providers, + vote_average=vote_average, + vote_count=vote_count, + release_date=release_date, + page=page) return [movie.to_dict() for movie in movies] if movies else [] @log_execution_time(logger=logger) @@ -200,16 +196,16 @@ class RecommendChain(ChainBase, metaclass=Singleton): """ TMDB热门电视剧 """ - tvs = self.tmdbchain.tmdb_discover(mtype=MediaType.TV, - sort_by=sort_by, - with_genres=with_genres, - with_original_language=with_original_language, - with_keywords=with_keywords, - with_watch_providers=with_watch_providers, - vote_average=vote_average, - vote_count=vote_count, - release_date=release_date, - page=page) + tvs = TmdbChain().tmdb_discover(mtype=MediaType.TV, + sort_by=sort_by, + with_genres=with_genres, + with_original_language=with_original_language, + with_keywords=with_keywords, + with_watch_providers=with_watch_providers, + vote_average=vote_average, + vote_count=vote_count, + release_date=release_date, + page=page) return [tv.to_dict() for tv in tvs] if tvs else [] @log_execution_time(logger=logger) @@ -218,7 +214,7 @@ class RecommendChain(ChainBase, metaclass=Singleton): """ TMDB流行趋势 """ - infos = self.tmdbchain.tmdb_trending(page=page) + infos = TmdbChain().tmdb_trending(page=page) return [info.to_dict() for info in infos] if infos else [] @log_execution_time(logger=logger) @@ -227,7 +223,7 @@ class RecommendChain(ChainBase, metaclass=Singleton): """ Bangumi每日放送 """ - medias = self.bangumichain.calendar() + medias = BangumiChain().calendar() return [media.to_dict() for media in medias[(page - 1) * count: page * count]] if medias else [] @log_execution_time(logger=logger) @@ -236,7 +232,7 @@ class RecommendChain(ChainBase, metaclass=Singleton): """ 豆瓣正在热映 """ - movies = self.doubanchain.movie_showing(page=page, count=count) + movies = DoubanChain().movie_showing(page=page, count=count) return [media.to_dict() for media in movies] if movies else [] @log_execution_time(logger=logger) @@ -246,8 +242,8 @@ class RecommendChain(ChainBase, metaclass=Singleton): """ 豆瓣最新电影 """ - movies = self.doubanchain.douban_discover(mtype=MediaType.MOVIE, - sort=sort, tags=tags, page=page, count=count) + movies = DoubanChain().douban_discover(mtype=MediaType.MOVIE, + sort=sort, tags=tags, page=page, count=count) return [media.to_dict() for media in movies] if movies else [] @log_execution_time(logger=logger) @@ -257,8 +253,8 @@ class RecommendChain(ChainBase, metaclass=Singleton): """ 豆瓣最新电视剧 """ - tvs = self.doubanchain.douban_discover(mtype=MediaType.TV, - sort=sort, tags=tags, page=page, count=count) + tvs = DoubanChain().douban_discover(mtype=MediaType.TV, + sort=sort, tags=tags, page=page, count=count) return [media.to_dict() for media in tvs] if tvs else [] @log_execution_time(logger=logger) @@ -267,7 +263,7 @@ class RecommendChain(ChainBase, metaclass=Singleton): """ 豆瓣电影TOP250 """ - movies = self.doubanchain.movie_top250(page=page, count=count) + movies = DoubanChain().movie_top250(page=page, count=count) return [media.to_dict() for media in movies] if movies else [] @log_execution_time(logger=logger) @@ -276,7 +272,7 @@ class RecommendChain(ChainBase, metaclass=Singleton): """ 豆瓣国产剧集榜 """ - tvs = self.doubanchain.tv_weekly_chinese(page=page, count=count) + tvs = DoubanChain().tv_weekly_chinese(page=page, count=count) return [media.to_dict() for media in tvs] if tvs else [] @log_execution_time(logger=logger) @@ -285,7 +281,7 @@ class RecommendChain(ChainBase, metaclass=Singleton): """ 豆瓣全球剧集榜 """ - tvs = self.doubanchain.tv_weekly_global(page=page, count=count) + tvs = DoubanChain().tv_weekly_global(page=page, count=count) return [media.to_dict() for media in tvs] if tvs else [] @log_execution_time(logger=logger) @@ -294,7 +290,7 @@ class RecommendChain(ChainBase, metaclass=Singleton): """ 豆瓣热门动漫 """ - tvs = self.doubanchain.tv_animation(page=page, count=count) + tvs = DoubanChain().tv_animation(page=page, count=count) return [media.to_dict() for media in tvs] if tvs else [] @log_execution_time(logger=logger) @@ -303,7 +299,7 @@ class RecommendChain(ChainBase, metaclass=Singleton): """ 豆瓣热门电影 """ - movies = self.doubanchain.movie_hot(page=page, count=count) + movies = DoubanChain().movie_hot(page=page, count=count) return [media.to_dict() for media in movies] if movies else [] @log_execution_time(logger=logger) @@ -312,5 +308,5 @@ class RecommendChain(ChainBase, metaclass=Singleton): """ 豆瓣热门电视剧 """ - tvs = self.doubanchain.tv_hot(page=page, count=count) + tvs = DoubanChain().tv_hot(page=page, count=count) return [media.to_dict() for media in tvs] if tvs else [] diff --git a/app/chain/search.py b/app/chain/search.py index 36fdc416..5f7f0a87 100644 --- a/app/chain/search.py +++ b/app/chain/search.py @@ -27,13 +27,6 @@ class SearchChain(ChainBase): __result_temp_file = "__search_result__" - def __init__(self): - super().__init__() - self.siteshelper = SitesHelper() - self.progress = ProgressHelper() - self.systemconfig = SystemConfigOper() - self.torrenthelper = TorrentHelper() - def search_by_id(self, tmdbid: Optional[int] = None, doubanid: Optional[str] = None, mtype: MediaType = None, area: Optional[str] = "title", season: Optional[int] = None, sites: List[int] = None, cache_local: bool = False) -> List[Context]: @@ -184,19 +177,20 @@ class SearchChain(ChainBase): return [] # 开始新进度 - self.progress.start(ProgressKey.Search) + progress = ProgressHelper() + progress.start(ProgressKey.Search) # 开始过滤 - self.progress.update(value=0, text=f'开始过滤,总 {len(torrents)} 个资源,请稍候...', - key=ProgressKey.Search) + progress.update(value=0, text=f'开始过滤,总 {len(torrents)} 个资源,请稍候...', + key=ProgressKey.Search) # 匹配订阅附加参数 if filter_params: logger.info(f'开始附加参数过滤,附加参数:{filter_params} ...') - torrents = [torrent for torrent in torrents if self.torrenthelper.filter_torrent(torrent, filter_params)] + torrents = [torrent for torrent in torrents if TorrentHelper().filter_torrent(torrent, filter_params)] # 开始过滤规则过滤 if rule_groups is None: # 取搜索过滤规则 - rule_groups: List[str] = self.systemconfig.get(SystemConfigKey.SearchFilterRuleGroups) + rule_groups: List[str] = SystemConfigOper().get(SystemConfigKey.SearchFilterRuleGroups) if rule_groups: logger.info(f'开始过滤规则/剧集过滤,使用规则组:{rule_groups} ...') torrents = __do_filter(torrents) @@ -206,7 +200,7 @@ class SearchChain(ChainBase): logger.info(f"过滤规则/剧集过滤完成,剩余 {len(torrents)} 个资源") # 过滤完成 - self.progress.update(value=50, text=f'过滤完成,剩余 {len(torrents)} 个资源', key=ProgressKey.Search) + progress.update(value=50, text=f'过滤完成,剩余 {len(torrents)} 个资源', key=ProgressKey.Search) # 开始匹配 _match_torrents = [] @@ -215,17 +209,19 @@ class SearchChain(ChainBase): # 已处理数 _count = 0 + torrenthelper = TorrentHelper() + if mediainfo: # 英文标题应该在别名/原标题中,不需要再匹配 logger.info(f"开始匹配结果 标题:{mediainfo.title},原标题:{mediainfo.original_title},别名:{mediainfo.names}") - self.progress.update(value=51, text=f'开始匹配,总 {_total} 个资源 ...', key=ProgressKey.Search) + progress.update(value=51, text=f'开始匹配,总 {_total} 个资源 ...', key=ProgressKey.Search) for torrent in torrents: if global_vars.is_system_stopped: break _count += 1 - self.progress.update(value=(_count / _total) * 96, - text=f'正在匹配 {torrent.site_name},已完成 {_count} / {_total} ...', - key=ProgressKey.Search) + progress.update(value=(_count / _total) * 96, + text=f'正在匹配 {torrent.site_name},已完成 {_count} / {_total} ...', + key=ProgressKey.Search) if not torrent.title: continue @@ -236,10 +232,9 @@ class SearchChain(ChainBase): logger.info(f"种子名称应用识别词后发生改变:{torrent.title} => {torrent_meta.org_string}") # 季集数过滤 if season_episodes \ - and not self.torrenthelper.match_season_episodes( - torrent=torrent, - meta=torrent_meta, - season_episodes=season_episodes): + and not torrenthelper.match_season_episodes(torrent=torrent, + meta=torrent_meta, + season_episodes=season_episodes): continue # 比对IMDBID if torrent.imdbid \ @@ -250,17 +245,17 @@ class SearchChain(ChainBase): continue # 比对种子 - if self.torrenthelper.match_torrent(mediainfo=mediainfo, - torrent_meta=torrent_meta, - torrent=torrent): + if torrenthelper.match_torrent(mediainfo=mediainfo, + torrent_meta=torrent_meta, + torrent=torrent): # 匹配成功 _match_torrents.append((torrent, torrent_meta)) continue # 匹配完成 logger.info(f"匹配完成,共匹配到 {len(_match_torrents)} 个资源") - self.progress.update(value=97, - text=f'匹配完成,共匹配到 {len(_match_torrents)} 个资源', - key=ProgressKey.Search) + progress.update(value=97, + text=f'匹配完成,共匹配到 {len(_match_torrents)} 个资源', + key=ProgressKey.Search) else: _match_torrents = [(t, MetaInfo(title=t.title, subtitle=t.description)) for t in torrents] @@ -273,17 +268,17 @@ class SearchChain(ChainBase): meta_info=t[1]) for t in _match_torrents] # 排序 - self.progress.update(value=99, - text=f'正在对 {len(contexts)} 个资源进行排序,请稍候...', - key=ProgressKey.Search) - contexts = self.torrenthelper.sort_torrents(contexts) + progress.update(value=99, + text=f'正在对 {len(contexts)} 个资源进行排序,请稍候...', + key=ProgressKey.Search) + contexts = torrenthelper.sort_torrents(contexts) # 结束进度 logger.info(f'搜索完成,共 {len(contexts)} 个资源') - self.progress.update(value=100, - text=f'搜索完成,共 {len(contexts)} 个资源', - key=ProgressKey.Search) - self.progress.end(ProgressKey.Search) + progress.update(value=100, + text=f'搜索完成,共 {len(contexts)} 个资源', + key=ProgressKey.Search) + progress.end(ProgressKey.Search) # 返回 return contexts @@ -307,9 +302,9 @@ class SearchChain(ChainBase): # 配置的索引站点 if not sites: - sites = self.systemconfig.get(SystemConfigKey.IndexerSites) or [] + sites = SystemConfigOper().get(SystemConfigKey.IndexerSites) or [] - for indexer in self.siteshelper.get_indexers(): + for indexer in SitesHelper().get_indexers(): # 检查站点索引开关 if not sites or indexer.get("id") in sites: indexer_sites.append(indexer) @@ -318,7 +313,8 @@ class SearchChain(ChainBase): return [] # 开始进度 - self.progress.start(ProgressKey.Search) + progress = ProgressHelper() + progress.start(ProgressKey.Search) # 开始计时 start_time = datetime.now() # 总数 @@ -326,9 +322,9 @@ class SearchChain(ChainBase): # 完成数 finish_count = 0 # 更新进度 - self.progress.update(value=0, - text=f"开始搜索,共 {total_num} 个站点 ...", - key=ProgressKey.Search) + progress.update(value=0, + text=f"开始搜索,共 {total_num} 个站点 ...", + key=ProgressKey.Search) # 结果集 results = [] # 多线程 @@ -356,18 +352,18 @@ class SearchChain(ChainBase): if result: results.extend(result) logger.info(f"站点搜索进度:{finish_count} / {total_num}") - self.progress.update(value=finish_count / total_num * 100, - text=f"正在搜索{keywords or ''},已完成 {finish_count} / {total_num} 个站点 ...", - key=ProgressKey.Search) + progress.update(value=finish_count / total_num * 100, + text=f"正在搜索{keywords or ''},已完成 {finish_count} / {total_num} 个站点 ...", + key=ProgressKey.Search) # 计算耗时 end_time = datetime.now() # 更新进度 - self.progress.update(value=100, - text=f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds} 秒", - key=ProgressKey.Search) + progress.update(value=100, + text=f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds} 秒", + key=ProgressKey.Search) logger.info(f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds} 秒") # 结束进度 - self.progress.end(ProgressKey.Search) + progress.end(ProgressKey.Search) # 返回 return results diff --git a/app/chain/site.py b/app/chain/site.py index fd9c4007..12bddd31 100644 --- a/app/chain/site.py +++ b/app/chain/site.py @@ -16,7 +16,6 @@ from app.helper.browser import PlaywrightHelper from app.helper.cloudflare import under_challenge from app.helper.cookie import CookieHelper from app.helper.cookiecloud import CookieCloudHelper -from app.helper.message import MessageHelper from app.helper.rss import RssHelper from app.helper.sites import SitesHelper from app.log import logger @@ -34,13 +33,6 @@ class SiteChain(ChainBase): def __init__(self): super().__init__() - self.siteoper = SiteOper() - self.siteshelper = SitesHelper() - self.rsshelper = RssHelper() - self.cookiehelper = CookieHelper() - self.message = MessageHelper() - self.cookiecloud = CookieCloudHelper() - self.systemconfig = SystemConfigOper() # 特殊站点登录验证 self.special_site_test = { @@ -62,9 +54,9 @@ class SiteChain(ChainBase): """ userdata: SiteUserData = self.run_module("refresh_userdata", site=site) if userdata: - self.siteoper.update_userdata(domain=StringUtils.get_url_domain(site.get("domain")), - name=site.get("name"), - payload=userdata.dict()) + SiteOper().update_userdata(domain=StringUtils.get_url_domain(site.get("domain")), + name=site.get("name"), + payload=userdata.dict()) # 发送事件 EventManager().send_event(EventType.SiteRefreshed, { "site_id": site.get("id") @@ -100,7 +92,7 @@ class SiteChain(ChainBase): """ 刷新所有站点的用户数据 """ - sites = self.siteshelper.get_indexers() + sites = SitesHelper().get_indexers() any_site_updated = False result = {} for site in sites: @@ -303,21 +295,24 @@ class SiteChain(ChainBase): return sub_domain logger.info("开始同步CookieCloud站点 ...") - cookies, msg = self.cookiecloud.download() + cookies, msg = CookieCloudHelper().download() if not cookies: logger.error(f"CookieCloud同步失败:{msg}") if manual: - self.message.put(msg, title="CookieCloud同步失败", role="system") + self.messagehelper.put(msg, title="CookieCloud同步失败", role="system") return False, msg # 保存Cookie或新增站点 _update_count = 0 _add_count = 0 _fail_count = 0 + siteshelper = SitesHelper() + siteoper = SiteOper() + rsshelper = RssHelper() for domain, cookie in cookies.items(): # 索引器信息 - indexer = self.siteshelper.get_indexer(domain) + indexer = siteshelper.get_indexer(domain) # 数据库的站点信息 - site_info = self.siteoper.get_by_domain(domain) + site_info = siteoper.get_by_domain(domain) if site_info and site_info.is_active == 1: # 站点已存在,检查站点连通性 status, msg = self.test(domain) @@ -327,7 +322,7 @@ class SiteChain(ChainBase): # 更新站点rss地址 if not site_info.public and not site_info.rss: # 自动生成rss地址 - rss_url, errmsg = self.rsshelper.get_rss_link( + rss_url, errmsg = rsshelper.get_rss_link( url=site_info.url, cookie=cookie, ua=site_info.ua or settings.USER_AGENT, @@ -335,13 +330,13 @@ class SiteChain(ChainBase): ) if rss_url: logger.info(f"更新站点 {domain} RSS地址 ...") - self.siteoper.update_rss(domain=domain, rss=rss_url) + siteoper.update_rss(domain=domain, rss=rss_url) else: logger.warn(errmsg) continue # 更新站点Cookie logger.info(f"更新站点 {domain} Cookie ...") - self.siteoper.update_cookie(domain=domain, cookies=cookie) + siteoper.update_cookie(domain=domain, cookies=cookie) _update_count += 1 elif indexer: if settings.COOKIECLOUD_BLACKLIST and any( @@ -396,21 +391,21 @@ class SiteChain(ChainBase): rss_url = None if not indexer.get("public") and domain_url: # 自动生成rss地址 - rss_url, errmsg = self.rsshelper.get_rss_link(url=domain_url, - cookie=cookie, - ua=settings.USER_AGENT, - proxy=proxy) + rss_url, errmsg = rsshelper.get_rss_link(url=domain_url, + cookie=cookie, + ua=settings.USER_AGENT, + proxy=proxy) if errmsg: logger.warn(errmsg) # 插入数据库 logger.info(f"新增站点 {indexer.get('name')} ...") - self.siteoper.add(name=indexer.get("name"), - url=domain_url, - domain=domain, - cookie=cookie, - rss=rss_url, - proxy=1 if proxy else 0, - public=1 if indexer.get("public") else 0) + siteoper.add(name=indexer.get("name"), + url=domain_url, + domain=domain, + cookie=cookie, + rss=rss_url, + proxy=1 if proxy else 0, + public=1 if indexer.get("public") else 0) _add_count += 1 # 通知站点更新 @@ -423,7 +418,7 @@ class SiteChain(ChainBase): if _fail_count > 0: ret_msg += f",{_fail_count}个站点添加失败,下次同步时将重试,也可以手动添加" if manual: - self.message.put(ret_msg, title="CookieCloud同步成功", role="system") + self.messagehelper.put(ret_msg, title="CookieCloud同步成功", role="system") logger.info(f"CookieCloud同步成功:{ret_msg}") return True, ret_msg @@ -442,29 +437,31 @@ class SiteChain(ChainBase): if str(domain).startswith("http"): domain = StringUtils.get_url_domain(domain) # 站点信息 - siteinfo = self.siteoper.get_by_domain(domain) + siteoper = SiteOper() + siteshelper = SitesHelper() + siteinfo = siteoper.get_by_domain(domain) if not siteinfo: logger.warn(f"未维护站点 {domain} 信息!") return # Cookie cookie = siteinfo.cookie # 索引器 - indexer = self.siteshelper.get_indexer(domain) + indexer = siteshelper.get_indexer(domain) if not indexer: logger.warn(f"站点 {domain} 索引器不存在!") return # 查询站点图标 - site_icon = self.siteoper.get_icon_by_domain(domain) + site_icon = siteoper.get_icon_by_domain(domain) if not site_icon or not site_icon.base64: logger.info(f"开始缓存站点 {indexer.get('name')} 图标 ...") icon_url, icon_base64 = self.__parse_favicon(url=indexer.get("domain"), cookie=cookie, ua=settings.USER_AGENT) if icon_url: - self.siteoper.update_icon(name=indexer.get("name"), - domain=domain, - icon_url=icon_url, - icon_base64=icon_base64) + siteoper.update_icon(name=indexer.get("name"), + domain=domain, + icon_url=icon_url, + icon_base64=icon_base64) logger.info(f"缓存站点 {indexer.get('name')} 图标成功") else: logger.warn(f"缓存站点 {indexer.get('name')} 图标失败") @@ -484,11 +481,12 @@ class SiteChain(ChainBase): # 获取主域名中间那段 domain_host = StringUtils.get_url_host(domain) # 查询以"site.domain_host"开头的配置项,并清除 - site_keys = self.systemconfig.all().keys() + systemconfig = SystemConfigOper() + site_keys = systemconfig.all().keys() for key in site_keys: if key.startswith(f"site.{domain_host}"): logger.info(f"清理站点配置:{key}") - self.systemconfig.delete(key) + systemconfig.delete(key) @eventmanager.register(EventType.SiteUpdated) def cache_site_userdata(self, event: Event): @@ -504,7 +502,7 @@ class SiteChain(ChainBase): return if str(domain).startswith("http"): domain = StringUtils.get_url_domain(domain) - indexer = self.siteshelper.get_indexer(domain) + indexer = SitesHelper().get_indexer(domain) if not indexer: return # 刷新站点用户数据 @@ -518,7 +516,8 @@ class SiteChain(ChainBase): """ # 检查域名是否可用 domain = StringUtils.get_url_domain(url) - site_info = self.siteoper.get_by_domain(domain) + siteoper = SiteOper() + site_info = siteoper.get_by_domain(domain) if not site_info: return False, f"站点【{url}】不存在" @@ -535,9 +534,9 @@ class SiteChain(ChainBase): # 统计 seconds = (datetime.now() - start_time).seconds if state: - self.siteoper.success(domain=domain, seconds=seconds) + siteoper.success(domain=domain, seconds=seconds) else: - self.siteoper.fail(domain) + siteoper.fail(domain) return state, message except Exception as e: return False, f"{str(e)}!" @@ -593,7 +592,7 @@ class SiteChain(ChainBase): """ 查询所有站点,发送消息 """ - site_list = self.siteoper.list() + site_list = SiteOper().list() if not site_list: self.post_message(Notification( channel=channel, @@ -633,7 +632,8 @@ class SiteChain(ChainBase): if not arg_str.isdigit(): return site_id = int(arg_str) - site = self.siteoper.get(site_id) + siteoper = SiteOper() + site = siteoper.get(site_id) if not site: self.post_message(Notification( channel=channel, @@ -641,7 +641,7 @@ class SiteChain(ChainBase): userid=userid)) return # 禁用站点 - self.siteoper.update(site_id, { + siteoper.update(site_id, { "is_active": False }) # 重新发送消息 @@ -655,25 +655,27 @@ class SiteChain(ChainBase): if not arg_str: return arg_strs = str(arg_str).split() + siteoper = SiteOper() for arg_str in arg_strs: arg_str = arg_str.strip() if not arg_str.isdigit(): continue site_id = int(arg_str) - site = self.siteoper.get(site_id) + site = siteoper.get(site_id) if not site: self.post_message(Notification( channel=channel, title=f"站点编号 {site_id} 不存在!", userid=userid)) return # 禁用站点 - self.siteoper.update(site_id, { + siteoper.update(site_id, { "is_active": True }) # 重新发送消息 self.remote_list(channel=channel, userid=userid, source=source) - def update_cookie(self, site_info: Site, + @staticmethod + def update_cookie(site_info: Site, username: str, password: str, two_step_code: Optional[str] = None) -> Tuple[bool, str]: """ 根据用户名密码更新站点Cookie @@ -684,7 +686,7 @@ class SiteChain(ChainBase): :return: (是否成功, 错误信息) """ # 更新站点Cookie - result = self.cookiehelper.get_site_cookie_ua( + result = CookieHelper().get_site_cookie_ua( url=site_info.url, username=username, password=password, @@ -695,7 +697,7 @@ class SiteChain(ChainBase): cookie, ua, msg = result if not cookie: return False, msg - self.siteoper.update(site_info.id, { + SiteOper().update(site_info.id, { "cookie": cookie, "ua": ua }) @@ -737,7 +739,7 @@ class SiteChain(ChainBase): # 站点ID site_id = int(site_id) # 站点信息 - site_info = self.siteoper.get(site_id) + site_info = SiteOper().get(site_id) if not site_info: self.post_message(Notification( channel=channel, diff --git a/app/chain/storage.py b/app/chain/storage.py index 3e52036e..50d9d25b 100644 --- a/app/chain/storage.py +++ b/app/chain/storage.py @@ -14,10 +14,6 @@ class StorageChain(ChainBase): 存储处理链 """ - def __init__(self): - super().__init__() - self.directoryhelper = DirectoryHelper() - def save_config(self, storage: str, conf: dict) -> None: """ 保存存储配置 @@ -192,7 +188,7 @@ class StorageChain(ChainBase): # 检查和删除上级目录 if dir_item and len(Path(dir_item.path).parts) > 2: # 如何目录是所有下载目录、媒体库目录的上级,则不处理 - for d in self.directoryhelper.get_dirs(): + for d in DirectoryHelper().get_dirs(): if d.download_path and Path(d.download_path).is_relative_to(Path(dir_item.path)): logger.debug(f"【{dir_item.storage}】{dir_item.path} 是下载目录本级或上级目录,不删除") return True diff --git a/app/chain/subscribe.py b/app/chain/subscribe.py index ba759201..572bbc59 100644 --- a/app/chain/subscribe.py +++ b/app/chain/subscribe.py @@ -24,36 +24,20 @@ from app.db.models.subscribe import Subscribe from app.db.site_oper import SiteOper from app.db.subscribe_oper import SubscribeOper from app.db.systemconfig_oper import SystemConfigOper -from app.helper.message import MessageHelper from app.helper.subscribe import SubscribeHelper from app.helper.torrent import TorrentHelper from app.log import logger from app.schemas import MediaRecognizeConvertEventData from app.schemas.types import MediaType, SystemConfigKey, MessageChannel, NotificationType, EventType, ChainEventType, \ ContentType -from app.utils.singleton import Singleton -class SubscribeChain(ChainBase, metaclass=Singleton): +class SubscribeChain(ChainBase): """ 订阅管理处理链 """ - def __init__(self): - super().__init__() - self._rlock = threading.RLock() - self.downloadchain = DownloadChain() - self.downloadhis = DownloadHistoryOper() - self.searchchain = SearchChain() - self.subscribeoper = SubscribeOper() - self.subscribehelper = SubscribeHelper() - self.torrentschain = TorrentsChain() - self.mediachain = MediaChain() - self.tmdbchain = TmdbChain() - self.message = MessageHelper() - self.systemconfig = SystemConfigOper() - self.torrenthelper = TorrentHelper() - self.siteoper = SiteOper() + _rlock = threading.RLock() def add(self, title: str, year: str, mtype: MediaType = None, @@ -87,11 +71,12 @@ class SubscribeChain(ChainBase, metaclass=Singleton): if event and event.event_data: event_data: MediaRecognizeConvertEventData = event.event_data if event_data.media_dict: + mediachain = MediaChain() new_id = event_data.media_dict.get("id") if event_data.convert_type == "themoviedb": - return self.mediachain.recognize_media(meta=_meta, tmdbid=new_id) + return mediachain.recognize_media(meta=_meta, tmdbid=new_id) elif event_data.convert_type == "douban": - return self.mediachain.recognize_media(meta=_meta, doubanid=new_id) + return mediachain.recognize_media(meta=_meta, doubanid=new_id) return None logger.info(f'开始添加订阅,标题:{title} ...') @@ -111,7 +96,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton): if not tmdbid: if doubanid: # 将豆瓣信息转换为TMDB信息 - tmdbinfo = self.mediachain.get_tmdbinfo_by_doubanid(doubanid=doubanid, mtype=mtype) + tmdbinfo = MediaChain().get_tmdbinfo_by_doubanid(doubanid=doubanid, mtype=mtype) if tmdbinfo: mediainfo = MediaInfo(tmdb_info=tmdbinfo) elif mediaid: @@ -214,7 +199,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton): "filter_groups") else kwargs.get("filter_groups") }) # 操作数据库 - sid, err_msg = self.subscribeoper.add(mediainfo=mediainfo, season=season, username=username, **kwargs) + sid, err_msg = SubscribeOper().add(mediainfo=mediainfo, season=season, username=username, **kwargs) if not sid: logger.error(f'{mediainfo.title_year} {err_msg}') if not exist_ok and message: @@ -253,7 +238,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton): "mediainfo": mediainfo.to_dict(), }) # 统计订阅 - self.subscribehelper.sub_reg_async({ + SubscribeHelper().sub_reg_async({ "name": title, "year": year, "type": metainfo.type.value, @@ -271,13 +256,14 @@ class SubscribeChain(ChainBase, metaclass=Singleton): # 返回结果 return sid, "" - def exists(self, mediainfo: MediaInfo, meta: MetaBase = None): + @staticmethod + def exists(mediainfo: MediaInfo, meta: MetaBase = None): """ 判断订阅是否已存在 """ - if self.subscribeoper.exists(tmdbid=mediainfo.tmdb_id, - doubanid=mediainfo.douban_id, - season=meta.begin_season if meta else None): + if SubscribeOper().exists(tmdbid=mediainfo.tmdb_id, + doubanid=mediainfo.douban_id, + season=meta.begin_season if meta else None): return True return False @@ -291,11 +277,12 @@ class SubscribeChain(ChainBase, metaclass=Singleton): """ with self._rlock: logger.debug(f"search lock acquired at {datetime.now()}") + subscribeoper = SubscribeOper() if sid: - subscribe = self.subscribeoper.get(sid) + subscribe = subscribeoper.get(sid) subscribes = [subscribe] if subscribe else [] else: - subscribes = self.subscribeoper.list(self.get_states_for_search(state)) + subscribes = subscribeoper.list(self.get_states_for_search(state)) # 遍历订阅 for subscribe in subscribes: if global_vars.is_system_stopped: @@ -350,20 +337,20 @@ class SubscribeChain(ChainBase, metaclass=Singleton): # 优先级过滤规则 if subscribe.best_version: rule_groups = subscribe.filter_groups \ - or self.systemconfig.get(SystemConfigKey.BestVersionFilterRuleGroups) or [] + or SystemConfigOper().get(SystemConfigKey.BestVersionFilterRuleGroups) or [] else: rule_groups = subscribe.filter_groups \ - or self.systemconfig.get(SystemConfigKey.SubscribeFilterRuleGroups) or [] + or SystemConfigOper().get(SystemConfigKey.SubscribeFilterRuleGroups) or [] # 搜索,同时电视剧会过滤掉不需要的剧集 - contexts = self.searchchain.process(mediainfo=mediainfo, - keyword=subscribe.keyword, - no_exists=no_exists, - sites=sites, - rule_groups=rule_groups, - area="imdbid" if subscribe.search_imdbid else "title", - custom_words=custom_word_list, - filter_params=self.get_params(subscribe)) + contexts = SearchChain().process(mediainfo=mediainfo, + keyword=subscribe.keyword, + no_exists=no_exists, + sites=sites, + rule_groups=rule_groups, + area="imdbid" if subscribe.search_imdbid else "title", + custom_words=custom_word_list, + filter_params=self.get_params(subscribe)) if not contexts: logger.warn(f'订阅 {subscribe.keyword or subscribe.name} 未搜索到资源') self.finish_subscribe_or_not(subscribe=subscribe, meta=meta, @@ -404,7 +391,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton): continue # 自动下载 - downloads, lefts = self.downloadchain.batch_download( + downloads, lefts = DownloadChain().batch_download( contexts=matched_contexts, no_exists=no_exists, userid=subscribe.username, @@ -415,7 +402,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton): ) # 同步外部修改,更新订阅信息 - subscribe = self.subscribeoper.get(subscribe.id) + subscribe = subscribeoper.get(subscribe.id) # 判断是否应完成订阅 if subscribe: @@ -424,17 +411,17 @@ class SubscribeChain(ChainBase, metaclass=Singleton): finally: # 如果状态为N则更新为R if subscribe and subscribe.state == 'N': - self.subscribeoper.update(subscribe.id, {'state': 'R'}) + subscribeoper.update(subscribe.id, {'state': 'R'}) # 手动触发时发送系统消息 if manual: if subscribes: if sid: - self.message.put(f'{subscribes[0].name} 搜索完成!', title="订阅搜索", role="system") + self.messagehelper.put(f'{subscribes[0].name} 搜索完成!', title="订阅搜索", role="system") else: - self.message.put('所有订阅搜索完成!', title="订阅搜索", role="system") + self.messagehelper.put('所有订阅搜索完成!', title="订阅搜索", role="system") else: - self.message.put('没有找到订阅!', title="订阅搜索", role="system") + self.messagehelper.put('没有找到订阅!', title="订阅搜索", role="system") logger.debug(f"search Lock released at {datetime.now()}") def update_subscribe_priority(self, subscribe: Subscribe, meta: MetaBase, @@ -449,7 +436,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton): # 当前下载资源的优先级 priority = max([item.torrent_info.pri_order for item in downloads]) # 订阅存在待定策略,不管是否已完成,均需更新订阅信息 - self.subscribeoper.update(subscribe.id, { + SubscribeOper().update(subscribe.id, { "current_priority": priority, "last_update": datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) @@ -506,17 +493,18 @@ class SubscribeChain(ChainBase, metaclass=Singleton): if sites is None: return self.match( - self.torrentschain.refresh(sites=sites) + TorrentsChain().refresh(sites=sites) ) - def get_sub_sites(self, subscribe: Subscribe) -> List[int]: + @staticmethod + def get_sub_sites(subscribe: Subscribe) -> List[int]: """ 获取订阅中涉及的站点清单 :param subscribe: 订阅信息对象 :return: 涉及的站点清单 """ # 从系统配置获取默认订阅站点 - default_sites = self.systemconfig.get(SystemConfigKey.RssSites) or [] + default_sites = SystemConfigOper().get(SystemConfigKey.RssSites) or [] # 如果订阅未指定站点,直接返回默认站点 if not subscribe.sites: return default_sites @@ -536,7 +524,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton): :return: 返回[]代表所有站点命中,返回None代表没有订阅 """ # 查询所有订阅 - subscribes = self.subscribeoper.list(self.get_states_for_search('R')) + subscribes = SubscribeOper().list(self.get_states_for_search('R')) if not subscribes: return None ret_sites = [] @@ -561,7 +549,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton): with self._rlock: logger.debug(f"match lock acquired at {datetime.now()}") # 所有订阅 - subscribes = self.subscribeoper.list(self.get_states_for_search('R')) + subscribes = SubscribeOper().list(self.get_states_for_search('R')) # 预识别所有未识别的种子 processed_torrents: Dict[str, List[Context]] = {} @@ -598,7 +586,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton): # 订阅的站点域名列表 domains = [] if subscribe.sites: - domains = self.siteoper.get_domains_by_ids(subscribe.sites) + domains = SiteOper().get_domains_by_ids(subscribe.sites) # 识别媒体信息 mediainfo: MediaInfo = self.recognize_media(meta=meta, mtype=meta.type, tmdbid=subscribe.tmdbid, @@ -628,6 +616,9 @@ class SubscribeChain(ChainBase, metaclass=Singleton): # 遍历预识别后的种子 _match_context = [] + torrenthelper = TorrentHelper() + systemconfig = SystemConfigOper() + wordsmatcher = WordsMatcher() for domain, contexts in processed_torrents.items(): if global_vars.is_system_stopped: break @@ -650,8 +641,8 @@ class SubscribeChain(ChainBase, metaclass=Singleton): # 有自定义识别词时,需要判断是否需要重新识别 if custom_words_list: # 使用org_string,应用一次后理论上不能再次应用 - _, apply_words = WordsMatcher().prepare(torrent_meta.org_string, - custom_words=custom_words_list) + _, apply_words = wordsmatcher.prepare(torrent_meta.org_string, + custom_words=custom_words_list) if apply_words: logger.info( f'{torrent_info.site_name} - {torrent_info.title} 因订阅存在自定义识别词,重新识别元数据...') @@ -673,9 +664,9 @@ class SubscribeChain(ChainBase, metaclass=Singleton): if not torrent_mediainfo or (not torrent_mediainfo.tmdb_id and not torrent_mediainfo.douban_id): logger.info( f'{torrent_info.site_name} - {torrent_info.title} 重新识别失败,尝试通过标题匹配...') - if self.torrenthelper.match_torrent(mediainfo=mediainfo, - torrent_meta=torrent_meta, - torrent=torrent_info): + if torrenthelper.match_torrent(mediainfo=mediainfo, + torrent_meta=torrent_meta, + torrent=torrent_info): # 匹配成功 logger.info( f'{mediainfo.title_year} 通过标题匹配到可选资源:{torrent_info.site_name} - {torrent_info.title}') @@ -739,17 +730,17 @@ class SubscribeChain(ChainBase, metaclass=Singleton): continue # 匹配订阅附加参数 - if not self.torrenthelper.filter_torrent(torrent_info=torrent_info, - filter_params=self.get_params(subscribe)): + if not torrenthelper.filter_torrent(torrent_info=torrent_info, + filter_params=self.get_params(subscribe)): continue # 优先级过滤规则 if subscribe.best_version: rule_groups = subscribe.filter_groups \ - or self.systemconfig.get(SystemConfigKey.BestVersionFilterRuleGroups) + or systemconfig.get(SystemConfigKey.BestVersionFilterRuleGroups) else: rule_groups = subscribe.filter_groups \ - or self.systemconfig.get(SystemConfigKey.SubscribeFilterRuleGroups) + or systemconfig.get(SystemConfigKey.SubscribeFilterRuleGroups) result: List[TorrentInfo] = self.filter_torrents( rule_groups=rule_groups, torrent_list=[torrent_info], @@ -789,33 +780,22 @@ class SubscribeChain(ChainBase, metaclass=Singleton): # 开始批量择优下载 logger.info(f'{mediainfo.title_year} 匹配完成,共匹配到{len(_match_context)}个资源') - downloads, lefts = self.downloadchain.batch_download(contexts=_match_context, - no_exists=no_exists, - userid=subscribe.username, - username=subscribe.username, - save_path=subscribe.save_path, - downloader=subscribe.downloader, - source=self.get_subscribe_source_keyword(subscribe) - ) + downloads, lefts = DownloadChain().batch_download(contexts=_match_context, + no_exists=no_exists, + userid=subscribe.username, + username=subscribe.username, + save_path=subscribe.save_path, + downloader=subscribe.downloader, + source=self.get_subscribe_source_keyword(subscribe) + ) # 同步外部修改,更新订阅信息 - subscribe = self.subscribeoper.get(subscribe.id) + subscribe = SubscribeOper().get(subscribe.id) # 判断是否要完成订阅 if subscribe: self.finish_subscribe_or_not(subscribe=subscribe, meta=meta, mediainfo=mediainfo, downloads=downloads, lefts=lefts) - # 清理内存 - _match_context.clear() - del _match_context - downloads.clear() - del downloads - lefts.clear() - del lefts - - # 清理内存 - processed_torrents.clear() - del processed_torrents logger.debug(f"match Lock released at {datetime.now()}") @@ -824,7 +804,8 @@ class SubscribeChain(ChainBase, metaclass=Singleton): 定时检查订阅,更新订阅信息 """ # 查询所有订阅 - subscribes = self.subscribeoper.list() + subscribeoper = SubscribeOper() + subscribes = subscribeoper.list() if not subscribes: # 没有订阅不运行 return @@ -863,7 +844,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton): total_episode = subscribe.total_episode lack_episode = subscribe.lack_episode # 更新TMDB信息 - self.subscribeoper.update(subscribe.id, { + subscribeoper.update(subscribe.id, { "name": mediainfo.title, "year": mediainfo.year, "vote": mediainfo.vote_average, @@ -877,28 +858,30 @@ class SubscribeChain(ChainBase, metaclass=Singleton): }) logger.info(f'{subscribe.name} 订阅元数据更新完成') - def follow(self): + @staticmethod + def follow(): """ 刷新follow的用户分享,并自动添加订阅 """ - follow_users: List[str] = self.systemconfig.get(SystemConfigKey.FollowSubscribers) + follow_users: List[str] = SystemConfigOper().get(SystemConfigKey.FollowSubscribers) if not follow_users: return - share_subs = self.subscribehelper.get_shares() + share_subs = SubscribeHelper().get_shares() logger.info(f'开始刷新follow用户分享订阅 ...') success_count = 0 + subscribeoper = SubscribeOper() for share_sub in share_subs: uid = share_sub.get("share_uid") if uid and uid in follow_users: # 订阅已存在则跳过 - if self.subscribeoper.exists(tmdbid=share_sub.get("tmdbid"), - doubanid=share_sub.get("doubanid"), - season=share_sub.get("season")): + if subscribeoper.exists(tmdbid=share_sub.get("tmdbid"), + doubanid=share_sub.get("doubanid"), + season=share_sub.get("season")): continue # 已经订阅过跳过 - if self.subscribeoper.exist_history(tmdbid=share_sub.get("tmdbid"), - doubanid=share_sub.get("doubanid"), - season=share_sub.get("season")): + if subscribeoper.exist_history(tmdbid=share_sub.get("tmdbid"), + doubanid=share_sub.get("doubanid"), + season=share_sub.get("season")): continue # 去除无效属性 for key in list(share_sub.keys()): @@ -939,7 +922,8 @@ class SubscribeChain(ChainBase, metaclass=Singleton): logger.error(f'follow用户分享订阅 {title} 添加失败:{message}') logger.info(f'follow用户分享订阅刷新完成,共添加 {success_count} 个订阅') - def __update_subscribe_note(self, subscribe: Subscribe, downloads: Optional[List[Context]]): + @staticmethod + def __update_subscribe_note(subscribe: Subscribe, downloads: Optional[List[Context]]): """ 更新已下载信息到note字段 """ @@ -971,7 +955,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton): note = list(set(note).union(set(items))) # 更新订阅 if note: - self.subscribeoper.update(subscribe.id, { + SubscribeOper().update(subscribe.id, { "note": note }) @@ -995,7 +979,8 @@ class SubscribeChain(ChainBase, metaclass=Singleton): return note return [] - def __update_lack_episodes(self, lefts: Dict[Union[int, str], Dict[int, schemas.NotExistMediaInfo]], + @staticmethod + def __update_lack_episodes(lefts: Dict[Union[int, str], Dict[int, schemas.NotExistMediaInfo]], subscribe: Subscribe, mediainfo: MediaInfo, update_date: Optional[bool] = False): @@ -1028,7 +1013,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton): update_data["lack_episode"] = lack_episode # 更新数据库 if update_data: - self.subscribeoper.update(subscribe.id, update_data) + SubscribeOper().update(subscribe.id, update_data) def __finish_subscribe(self, subscribe: Subscribe, mediainfo: MediaInfo, meta: MetaBase): """ @@ -1041,9 +1026,10 @@ class SubscribeChain(ChainBase, metaclass=Singleton): msgstr = "订阅" if not subscribe.best_version else "洗版" logger.info(f'{mediainfo.title_year} 完成{msgstr}') # 新增订阅历史 - self.subscribeoper.add_history(**subscribe.to_dict()) + subscribeoper = SubscribeOper() + subscribeoper.add_history(**subscribe.to_dict()) # 删除订阅 - self.subscribeoper.delete(subscribe.id) + subscribeoper.delete(subscribe.id) # 发送通知 if mediainfo.type == MediaType.TV: link = settings.MP_DOMAIN('#/subscribe/tv?tab=mysub') @@ -1070,7 +1056,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton): "mediainfo": mediainfo.to_dict(), }) # 统计订阅 - self.subscribehelper.sub_done_async({ + SubscribeHelper().sub_done_async({ "tmdbid": mediainfo.tmdb_id, "doubanid": mediainfo.douban_id }) @@ -1080,7 +1066,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton): """ 查询订阅并发送消息 """ - subscribes = self.subscribeoper.list() + subscribes = SubscribeOper().list() if not subscribes: self.post_message(schemas.Notification(channel=channel, source=source, @@ -1114,20 +1100,22 @@ class SubscribeChain(ChainBase, metaclass=Singleton): "[id]为订阅编号", userid=userid)) return arg_strs = str(arg_str).split() + subscribeoper = SubscribeOper() + subscribehelper = SubscribeHelper() for arg_str in arg_strs: arg_str = arg_str.strip() if not arg_str.isdigit(): continue subscribe_id = int(arg_str) - subscribe = self.subscribeoper.get(subscribe_id) + subscribe = subscribeoper.get(subscribe_id) if not subscribe: self.post_message(schemas.Notification(channel=channel, source=source, title=f"订阅编号 {subscribe_id} 不存在!", userid=userid)) return # 删除订阅 - self.subscribeoper.delete(subscribe_id) + subscribeoper.delete(subscribe_id) # 统计订阅 - self.subscribehelper.sub_done_async({ + subscribehelper.sub_done_async({ "tmdbid": subscribe.tmdbid, "doubanid": subscribe.doubanid }) @@ -1253,13 +1241,14 @@ class SubscribeChain(ChainBase, metaclass=Singleton): site_id = event_data.get("site_id") if not site_id: return + subscribeoper = SubscribeOper() if site_id == "*": # 站点被重置 SystemConfigOper().set(SystemConfigKey.RssSites, []) - for subscribe in self.subscribeoper.list(): + for subscribe in subscribeoper.list(): if not subscribe.sites: continue - self.subscribeoper.update(subscribe.id, { + subscribeoper.update(subscribe.id, { "sites": [] }) return @@ -1269,14 +1258,14 @@ class SubscribeChain(ChainBase, metaclass=Singleton): selected_sites.remove(site_id) SystemConfigOper().set(SystemConfigKey.RssSites, selected_sites) # 查询所有订阅 - for subscribe in self.subscribeoper.list(): + for subscribe in subscribeoper.list(): if not subscribe.sites: continue sites = subscribe.sites or [] if site_id not in sites: continue sites.remove(site_id) - self.subscribeoper.update(subscribe.id, { + subscribeoper.update(subscribe.id, { "sites": sites }) @@ -1301,12 +1290,13 @@ class SubscribeChain(ChainBase, metaclass=Singleton): return None return value.get(default_config_key) or None - def get_params(self, subscribe: Subscribe): + @staticmethod + def get_params(subscribe: Subscribe): """ 获取订阅默认参数 """ # 默认过滤规则 - default_rule = self.systemconfig.get(SystemConfigKey.SubscribeDefaultParams) or {} + default_rule = SystemConfigOper().get(SystemConfigKey.SubscribeDefaultParams) or {} return { key: value for key, value in { "include": subscribe.include or default_rule.get("include"), @@ -1334,7 +1324,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton): episodes: Dict[int, schemas.SubscribeEpisodeInfo] = {} if subscribe.tmdbid and subscribe.type == MediaType.TV.value: # 查询TMDB中的集信息 - tmdb_episodes = self.tmdbchain.tmdb_episodes( + tmdb_episodes = TmdbChain().tmdb_episodes( tmdbid=subscribe.tmdbid, season=subscribe.season, episode_group=subscribe.episode_group @@ -1359,11 +1349,12 @@ class SubscribeChain(ChainBase, metaclass=Singleton): episodes[0] = info # 所有下载记录 - download_his = self.downloadhis.get_by_mediaid(tmdbid=subscribe.tmdbid, doubanid=subscribe.doubanid) + downloadhis = DownloadHistoryOper() + download_his = downloadhis.get_by_mediaid(tmdbid=subscribe.tmdbid, doubanid=subscribe.doubanid) if download_his: for his in download_his: # 查询下载文件 - files = self.downloadhis.get_files_by_hash(his.download_hash) + files = downloadhis.get_files_by_hash(his.download_hash) if files: for file in files: # 识别文件名 @@ -1457,7 +1448,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton): subscribe.season: subscribe.total_episode } # 查询媒体库缺失的媒体信息 - exist_flag, no_exists = self.downloadchain.get_no_exists_info( + exist_flag, no_exists = DownloadChain().get_no_exists_info( meta=meta, mediainfo=mediainfo, totals=totals diff --git a/app/chain/system.py b/app/chain/system.py index 7696364e..29d6a3c7 100644 --- a/app/chain/system.py +++ b/app/chain/system.py @@ -8,24 +8,18 @@ from app.core.config import settings from app.log import logger from app.schemas import Notification, MessageChannel from app.utils.http import RequestUtils -from app.utils.singleton import Singleton from app.utils.system import SystemUtils -from helper.system import SystemHelper +from app.helper.system import SystemHelper from version import FRONTEND_VERSION, APP_VERSION -class SystemChain(ChainBase, metaclass=Singleton): +class SystemChain(ChainBase): """ 系统级处理链 """ _restart_file = "__system_restart__" - def __init__(self): - super().__init__() - # 重启完成检测 - self.restart_finish() - def remote_clear_cache(self, channel: MessageChannel, userid: Union[int, str], source: Optional[str] = None): """ 清理系统缓存 diff --git a/app/chain/tmdb.py b/app/chain/tmdb.py index 771868b6..d7fc14eb 100644 --- a/app/chain/tmdb.py +++ b/app/chain/tmdb.py @@ -3,13 +3,11 @@ from typing import Optional, List from app import schemas from app.chain import ChainBase -from app.core.cache import cached from app.core.context import MediaInfo from app.schemas import MediaType -from app.utils.singleton import Singleton -class TmdbChain(ChainBase, metaclass=Singleton): +class TmdbChain(ChainBase): """ TheMovieDB处理链,单例运行 """ @@ -145,7 +143,6 @@ class TmdbChain(ChainBase, metaclass=Singleton): """ return self.run_module("tmdb_person_credits", person_id=person_id, page=page) - @cached(maxsize=1, ttl=3600) def get_random_wallpager(self) -> Optional[str]: """ 获取随机壁纸,缓存1个小时 @@ -159,7 +156,6 @@ class TmdbChain(ChainBase, metaclass=Singleton): return info.backdrop_path return None - @cached(maxsize=1, ttl=3600) def get_trending_wallpapers(self, num: Optional[int] = 10) -> List[str]: """ 获取所有流行壁纸 diff --git a/app/chain/torrents.py b/app/chain/torrents.py index 3272fe8c..f2701d12 100644 --- a/app/chain/torrents.py +++ b/app/chain/torrents.py @@ -17,11 +17,10 @@ from app.helper.torrent import TorrentHelper from app.log import logger from app.schemas import Notification from app.schemas.types import SystemConfigKey, MessageChannel, NotificationType, MediaType -from app.utils.singleton import Singleton from app.utils.string import StringUtils -class TorrentsChain(ChainBase, metaclass=Singleton): +class TorrentsChain(ChainBase): """ 站点首页或RSS种子处理链,服务于订阅、刷流等 """ @@ -29,22 +28,6 @@ class TorrentsChain(ChainBase, metaclass=Singleton): _spider_file = "__torrents_cache__" _rss_file = "__rss_cache__" - def __init__(self): - super().__init__() - self.siteshelper = SitesHelper() - self.siteoper = SiteOper() - self.rsshelper = RssHelper() - self.systemconfig = SystemConfigOper() - self.mediachain = MediaChain() - self.torrenthelper = TorrentHelper() - - def __del__(self): - """ - 析构函数,停止内存监控 - """ - if hasattr(self, 'memory_manager'): - self.memory_manager.stop_monitoring() - @property def cache_file(self) -> str: """ @@ -99,7 +82,7 @@ class TorrentsChain(ChainBase, metaclass=Singleton): :param page: 页码 """ logger.info(f'开始获取站点 {domain} 最新种子 ...') - site = self.siteshelper.get_indexer(domain) + site = SitesHelper().get_indexer(domain) if not site: logger.error(f'站点 {domain} 不存在!') return [] @@ -112,15 +95,15 @@ class TorrentsChain(ChainBase, metaclass=Singleton): :param domain: 站点域名 """ logger.info(f'开始获取站点 {domain} RSS ...') - site = self.siteshelper.get_indexer(domain) + site = SitesHelper().get_indexer(domain) if not site: logger.error(f'站点 {domain} 不存在!') return [] if not site.get("rss"): logger.error(f'站点 {domain} 未配置RSS地址!') return [] - rss_items = self.rsshelper.parse(site.get("rss"), True if site.get("proxy") else False, - timeout=int(site.get("timeout") or 30)) + rss_items = RssHelper().parse(site.get("rss"), True if site.get("proxy") else False, + timeout=int(site.get("timeout") or 30)) if rss_items is None: # rss过期,尝试保留原配置生成新的rss self.__renew_rss_url(domain=domain, site=site) @@ -164,7 +147,7 @@ class TorrentsChain(ChainBase, metaclass=Singleton): # 刷新站点 if not sites: - sites = self.systemconfig.get(SystemConfigKey.RssSites) or [] + sites = SystemConfigOper().get(SystemConfigKey.RssSites) or [] # 读取缓存 torrents_cache = self.get_torrents() @@ -172,10 +155,10 @@ class TorrentsChain(ChainBase, metaclass=Singleton): # 缓存过滤掉无效种子 for _domain, _torrents in torrents_cache.items(): torrents_cache[_domain] = [_torrent for _torrent in _torrents - if not self.torrenthelper.is_invalid(_torrent.torrent_info.enclosure)] + if not TorrentHelper().is_invalid(_torrent.torrent_info.enclosure)] # 所有站点索引 - indexers = self.siteshelper.get_indexers() + indexers = SitesHelper().get_indexers() # 需要刷新的站点domain domains = [] @@ -222,7 +205,7 @@ class TorrentsChain(ChainBase, metaclass=Singleton): and torrent.category == MediaType.TV.value: meta.type = MediaType.TV # 识别媒体信息 - mediainfo: MediaInfo = self.mediachain.recognize_by_meta(meta) + mediainfo: MediaInfo = MediaChain().recognize_by_meta(meta) if not mediainfo: logger.warn(f'{torrent.title} 未识别到媒体信息') # 存储空的媒体信息 @@ -282,7 +265,7 @@ class TorrentsChain(ChainBase, metaclass=Singleton): # RSS链接过期 logger.error(f"站点 {domain} RSS链接已过期,正在尝试自动获取!") # 自动生成rss地址 - rss_url, errmsg = self.rsshelper.get_rss_link( + rss_url, errmsg = RssHelper().get_rss_link( url=site.get("url"), cookie=site.get("cookie"), ua=site.get("ua") or settings.USER_AGENT, @@ -296,7 +279,7 @@ class TorrentsChain(ChainBase, metaclass=Singleton): # 获取过期rss除去passkey部分 new_rss = re.sub(r'&passkey=([a-zA-Z0-9]+)', f'&passkey={new_passkey}', site.get("rss")) logger.info(f"更新站点 {domain} RSS地址 ...") - self.siteoper.update_rss(domain=domain, rss=new_rss) + SiteOper().update_rss(domain=domain, rss=new_rss) else: # 发送消息 self.post_message( diff --git a/app/chain/transfer.py b/app/chain/transfer.py index f8e2f3c9..e5910e60 100755 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -328,7 +328,8 @@ class JobManager: # 计算状态为完成的任务数 if __mediaid__ not in self._job_view: return 0 - return sum([task.fileitem.size for task in self._job_view[__mediaid__].tasks if task.state == "completed" and task.fileitem.size is not None]) + return sum([task.fileitem.size for task in self._job_view[__mediaid__].tasks if + task.state == "completed" and task.fileitem.size is not None]) def total(self) -> int: """ @@ -371,14 +372,6 @@ class TransferChain(ChainBase, metaclass=Singleton): def __init__(self): super().__init__() - self.downloadhis = DownloadHistoryOper() - self.transferhis = TransferHistoryOper() - self.progress = ProgressHelper() - self.mediachain = MediaChain() - self.tmdbchain = TmdbChain() - self.storagechain = StorageChain() - self.systemconfig = SystemConfigOper() - self.directoryhelper = DirectoryHelper() self.jobview = JobManager() # 启动整理任务 @@ -397,11 +390,12 @@ class TransferChain(ChainBase, metaclass=Singleton): """ 整理完成后处理 """ + transferhis = TransferHistoryOper() if not transferinfo.success: # 转移失败 logger.warn(f"{task.fileitem.name} 入库失败:{transferinfo.message}") # 新增转移失败历史记录 - self.transferhis.add_fail( + transferhis.add_fail( fileitem=task.fileitem, mode=transferinfo.transfer_type if transferinfo else '', downloader=task.downloader, @@ -428,7 +422,7 @@ class TransferChain(ChainBase, metaclass=Singleton): logger.info(f"{task.fileitem.name} 入库成功:{transferinfo.target_diritem.path}") # 新增转移成功历史记录 - self.transferhis.add_success( + transferhis.add_success( fileitem=task.fileitem, mode=transferinfo.transfer_type if transferinfo else '', downloader=task.downloader, @@ -457,6 +451,7 @@ class TransferChain(ChainBase, metaclass=Singleton): tasks = self.jobview.success_tasks(task.mediainfo, task.meta.begin_season) # 记录已处理的种子hash processed_hashes = set() + storagechain = StorageChain() for t in tasks: # 下载器hash if t.download_hash and t.download_hash not in processed_hashes: @@ -465,7 +460,7 @@ class TransferChain(ChainBase, metaclass=Singleton): logger.info(f"移动模式删除种子成功:{t.download_hash} ") # 删除残留目录 if t.fileitem: - self.storagechain.delete_media_file(t.fileitem, delete_self=False) + storagechain.delete_media_file(t.fileitem, delete_self=False) # 整理完成且有成功的任务时 if self.jobview.is_finished(task): # 发送通知,实时手动整理时不发 @@ -543,6 +538,8 @@ class TransferChain(ChainBase, metaclass=Singleton): # 失败数量 fail_num = 0 + progress = ProgressHelper() + while not global_vars.is_system_stopped: try: item: TransferQueue = self._queue.get(block=False) @@ -556,24 +553,24 @@ class TransferChain(ChainBase, metaclass=Singleton): if __queue_start: logger.info("开始整理队列处理...") # 启动进度 - self.progress.start(ProgressKey.FileTransfer) + progress.start(ProgressKey.FileTransfer) # 重置计数 processed_num = 0 fail_num = 0 total_num = self.jobview.total() __process_msg = f"开始整理队列处理,当前共 {total_num} 个文件 ..." logger.info(__process_msg) - self.progress.update(value=0, - text=__process_msg, - key=ProgressKey.FileTransfer) + progress.update(value=0, + text=__process_msg, + key=ProgressKey.FileTransfer) # 队列已开始 __queue_start = False # 更新进度 __process_msg = f"正在整理 {fileitem.name} ..." logger.info(__process_msg) - self.progress.update(value=processed_num / total_num * 100, - text=__process_msg, - key=ProgressKey.FileTransfer) + progress.update(value=processed_num / total_num * 100, + text=__process_msg, + key=ProgressKey.FileTransfer) # 整理 state, err_msg = self.__handle_transfer(task=task, callback=item.callback) if not state: @@ -583,18 +580,18 @@ class TransferChain(ChainBase, metaclass=Singleton): processed_num += 1 __process_msg = f"{fileitem.name} 整理完成" logger.info(__process_msg) - self.progress.update(value=processed_num / total_num * 100, - text=__process_msg, - key=ProgressKey.FileTransfer) + progress.update(value=processed_num / total_num * 100, + text=__process_msg, + key=ProgressKey.FileTransfer) except queue.Empty: if not __queue_start: # 结束进度 __end_msg = f"整理队列处理完成,共整理 {processed_num} 个文件,失败 {fail_num} 个" logger.info(__end_msg) - self.progress.update(value=100, - text=__end_msg, - key=ProgressKey.FileTransfer) - self.progress.end(ProgressKey.FileTransfer) + progress.update(value=100, + text=__end_msg, + key=ProgressKey.FileTransfer) + progress.end(ProgressKey.FileTransfer) # 重置计数 processed_num = 0 fail_num = 0 @@ -614,6 +611,7 @@ class TransferChain(ChainBase, metaclass=Singleton): """ try: # 识别 + transferhis = TransferHistoryOper() if not task.mediainfo: mediainfo = None download_history = task.download_history @@ -633,7 +631,7 @@ class TransferChain(ChainBase, metaclass=Singleton): mediainfo.category = download_history.media_category else: # 识别媒体信息 - mediainfo = self.mediachain.recognize_by_meta(task.meta) + mediainfo = MediaChain().recognize_by_meta(task.meta) # 更新媒体图片 if mediainfo: @@ -641,7 +639,7 @@ class TransferChain(ChainBase, metaclass=Singleton): if not mediainfo: # 新增整理失败历史记录 - his = self.transferhis.add_fail( + his = transferhis.add_fail( fileitem=task.fileitem, mode=task.transfer_type, meta=task.meta, @@ -661,8 +659,8 @@ class TransferChain(ChainBase, metaclass=Singleton): # 如果未开启新增已入库媒体是否跟随TMDB信息变化则根据tmdbid查询之前的title if not settings.SCRAP_FOLLOW_TMDB: - transfer_history = self.transferhis.get_by_type_tmdbid(tmdbid=mediainfo.tmdb_id, - mtype=mediainfo.type.value) + transfer_history = transferhis.get_by_type_tmdbid(tmdbid=mediainfo.tmdb_id, + mtype=mediainfo.type.value) if transfer_history: mediainfo.title = transfer_history.title @@ -682,7 +680,7 @@ class TransferChain(ChainBase, metaclass=Singleton): # 默认值1 if season_num is None: season_num = 1 - task.episodes_info = self.tmdbchain.tmdb_episodes( + task.episodes_info = TmdbChain().tmdb_episodes( tmdbid=task.mediainfo.tmdb_id, season=season_num, episode_group=task.mediainfo.episode_group @@ -692,15 +690,15 @@ class TransferChain(ChainBase, metaclass=Singleton): if not task.target_directory: if task.target_path: # 指定目标路径,`手动整理`场景下使用,忽略源目录匹配,使用指定目录匹配 - task.target_directory = self.directoryhelper.get_dir(media=task.mediainfo, - dest_path=task.target_path, - target_storage=task.target_storage) + task.target_directory = DirectoryHelper().get_dir(media=task.mediainfo, + dest_path=task.target_path, + target_storage=task.target_storage) else: # 启用源目录匹配时,根据源目录匹配下载目录,否则按源目录同盘优先原则,如无源目录,则根据媒体信息获取目标目录 - task.target_directory = self.directoryhelper.get_dir(media=task.mediainfo, - storage=task.fileitem.storage, - src_path=Path(task.fileitem.path), - target_storage=task.target_storage) + task.target_directory = DirectoryHelper().get_dir(media=task.mediainfo, + storage=task.fileitem.storage, + src_path=Path(task.fileitem.path), + target_storage=task.target_storage) if not task.target_storage and task.target_directory: task.target_storage = task.target_directory.library_storage @@ -784,7 +782,7 @@ class TransferChain(ChainBase, metaclass=Singleton): # 全局锁,避免重复处理 with downloader_lock: # 获取下载器监控目录 - download_dirs = self.directoryhelper.get_download_dirs() + download_dirs = DirectoryHelper().get_download_dirs() # 如果没有下载器监控的目录则不处理 if not any(dir_info.monitor_type == "downloader" and dir_info.storage == "local" for dir_info in download_dirs): @@ -820,7 +818,7 @@ class TransferChain(ChainBase, metaclass=Singleton): logger.debug(f"文件 {file_path} 不在下载器监控目录中,不通过下载器进行整理") continue # 查询下载记录识别情况 - downloadhis: DownloadHistory = self.downloadhis.get_by_hash(torrent.hash) + downloadhis: DownloadHistory = DownloadHistoryOper().get_by_hash(torrent.hash) if downloadhis: # 类型 try: @@ -868,7 +866,7 @@ class TransferChain(ChainBase, metaclass=Singleton): return True def __get_trans_fileitems( - self, fileitem: FileItem, depth: int = 1 + self, fileitem: FileItem, depth: int = 1 ) -> List[Tuple[FileItem, bool]]: """ 获取整理目录或文件列表 @@ -876,6 +874,7 @@ class TransferChain(ChainBase, metaclass=Singleton): :param fileitem: 文件项 :param depth: 递归深度,默认为1 """ + storagechain = StorageChain() def __contains_bluray_sub(_fileitems: List[FileItem]) -> bool: """ @@ -899,10 +898,10 @@ class TransferChain(ChainBase, metaclass=Singleton): """ for p in _path.parents: if p.name == "BDMV": - return self.storagechain.get_file_item(storage=_storage, path=p.parent) + return storagechain.get_file_item(storage=_storage, path=p.parent) return None - if not self.storagechain.get_item(fileitem): + if not storagechain.get_item(fileitem): logger.warn(f"目录或文件不存在:{fileitem.path}") return [] @@ -917,7 +916,7 @@ class TransferChain(ChainBase, metaclass=Singleton): return [(fileitem, False)] # 蓝光原盘根目录 - sub_items = self.storagechain.list_files(fileitem) or [] + sub_items = storagechain.list_files(fileitem) or [] if __contains_bluray_sub(sub_items): return [(fileitem, True)] @@ -994,7 +993,7 @@ class TransferChain(ChainBase, metaclass=Singleton): offset=epformat.offset) if epformat else None # 整理屏蔽词 - transfer_exclude_words = self.systemconfig.get(SystemConfigKey.TransferExcludeWords) + transfer_exclude_words = SystemConfigOper().get(SystemConfigKey.TransferExcludeWords) # 汇总错误信息 err_msgs: List[str] = [] # 待整理目录或文件项 @@ -1013,7 +1012,7 @@ class TransferChain(ChainBase, metaclass=Singleton): # 如果是目录且不是⼀蓝光原盘,获取所有文件并整理 if trans_item.type == "dir" and not bluray_dir: # 遍历获取下载目录所有文件(递归) - if files := self.storagechain.list_files(trans_item, recursion=True): + if files := StorageChain().list_files(trans_item, recursion=True): file_items.extend([(file, False) for file in files]) else: file_items.append((trans_item, bluray_dir)) @@ -1062,7 +1061,7 @@ class TransferChain(ChainBase, metaclass=Singleton): # 整理成功的不再处理 if not force: - transferd = self.transferhis.get_by_src(file_item.path, storage=file_item.storage) + transferd = TransferHistoryOper().get_by_src(file_item.path, storage=file_item.storage) if transferd: if not transferd.status: all_success = False @@ -1098,14 +1097,15 @@ class TransferChain(ChainBase, metaclass=Singleton): # 根据父路径获取下载历史 download_history = None + downloadhis = DownloadHistoryOper() if bluray_dir: # 蓝光原盘,按目录名查询 - download_history = self.downloadhis.get_by_path(str(file_path)) + download_history = downloadhis.get_by_path(str(file_path)) else: # 按文件全路径查询 - download_file = self.downloadhis.get_file_by_fullpath(str(file_path)) + download_file = downloadhis.get_file_by_fullpath(str(file_path)) if download_file: - download_history = self.downloadhis.get_by_hash(download_file.download_hash) + download_history = downloadhis.get_by_hash(download_file.download_hash) # 获取下载Hash if download_history and (not downloader or not download_hash): @@ -1148,12 +1148,13 @@ class TransferChain(ChainBase, metaclass=Singleton): fail_num = 0 # 启动进度 - self.progress.start(ProgressKey.FileTransfer) + progress = ProgressHelper() + progress.start(ProgressKey.FileTransfer) __process_msg = f"开始整理,共 {total_num} 个文件 ..." logger.info(__process_msg) - self.progress.update(value=0, - text=__process_msg, - key=ProgressKey.FileTransfer) + progress.update(value=0, + text=__process_msg, + key=ProgressKey.FileTransfer) for transfer_task in transfer_tasks: if global_vars.is_system_stopped: @@ -1163,9 +1164,9 @@ class TransferChain(ChainBase, metaclass=Singleton): # 更新进度 __process_msg = f"正在整理 ({processed_num + fail_num + 1}/{total_num}){transfer_task.fileitem.name} ..." logger.info(__process_msg) - self.progress.update(value=(processed_num + fail_num) / total_num * 100, - text=__process_msg, - key=ProgressKey.FileTransfer) + progress.update(value=(processed_num + fail_num) / total_num * 100, + text=__process_msg, + key=ProgressKey.FileTransfer) state, err_msg = self.__handle_transfer( task=transfer_task, callback=self.__default_callback @@ -1181,10 +1182,10 @@ class TransferChain(ChainBase, metaclass=Singleton): # 整理结束 __end_msg = f"整理队列处理完成,共整理 {total_num} 个文件,失败 {fail_num} 个" logger.info(__end_msg) - self.progress.update(value=100, - text=__end_msg, - key=ProgressKey.FileTransfer) - self.progress.end(ProgressKey.FileTransfer) + progress.update(value=100, + text=__end_msg, + key=ProgressKey.FileTransfer) + progress.end(ProgressKey.FileTransfer) return all_success, ",".join(err_msgs) @@ -1239,7 +1240,7 @@ class TransferChain(ChainBase, metaclass=Singleton): :param mediaid: TMDB ID/豆瓣ID """ # 查询历史记录 - history: TransferHistory = self.transferhis.get(logid) + history: TransferHistory = TransferHistoryOper().get(logid) if not history: logger.error(f"整理记录不存在,ID:{logid}") return False, "整理记录不存在" @@ -1255,7 +1256,7 @@ class TransferChain(ChainBase, metaclass=Singleton): # 更新媒体图片 self.obtain_images(mediainfo=mediainfo) else: - mediainfo = self.mediachain.recognize_by_path(str(src_path), episode_group=history.episode_group) + mediainfo = MediaChain().recognize_by_path(str(src_path), episode_group=history.episode_group) if not mediainfo: return False, f"未识别到媒体信息,类型:{mtype.value},id:{mediaid}" # 重新执行整理 @@ -1265,7 +1266,7 @@ class TransferChain(ChainBase, metaclass=Singleton): if history.dest_fileitem: # 解析目标文件对象 dest_fileitem = FileItem(**history.dest_fileitem) - self.storagechain.delete_file(dest_fileitem) + StorageChain().delete_file(dest_fileitem) # 强制整理 if history.src_fileitem: @@ -1320,18 +1321,19 @@ class TransferChain(ChainBase, metaclass=Singleton): if tmdbid or doubanid: # 有输入TMDBID时单个识别 # 识别媒体信息 - mediainfo: MediaInfo = self.mediachain.recognize_media(tmdbid=tmdbid, doubanid=doubanid, - mtype=mtype, episode_group=episode_group) + mediainfo: MediaInfo = MediaChain().recognize_media(tmdbid=tmdbid, doubanid=doubanid, + mtype=mtype, episode_group=episode_group) if not mediainfo: return False, f"媒体信息识别失败,tmdbid:{tmdbid},doubanid:{doubanid},type: {mtype.value}" else: # 更新媒体图片 self.obtain_images(mediainfo=mediainfo) # 开始进度 - self.progress.start(ProgressKey.FileTransfer) - self.progress.update(value=0, - text=f"开始整理 {fileitem.path} ...", - key=ProgressKey.FileTransfer) + progress = ProgressHelper() + progress.start(ProgressKey.FileTransfer) + progress.update(value=0, + text=f"开始整理 {fileitem.path} ...", + key=ProgressKey.FileTransfer) # 开始整理 state, errmsg = self.do_transfer( fileitem=fileitem, @@ -1352,7 +1354,7 @@ class TransferChain(ChainBase, metaclass=Singleton): if not state: return False, errmsg - self.progress.end(ProgressKey.FileTransfer) + progress.end(ProgressKey.FileTransfer) logger.info(f"{fileitem.path} 整理完成") return True, "" else: @@ -1373,7 +1375,8 @@ class TransferChain(ChainBase, metaclass=Singleton): return state, errmsg def send_transfer_message(self, meta: MetaBase, mediainfo: MediaInfo, - transferinfo: TransferInfo, season_episode: Optional[str] = None, username: Optional[str] = None): + transferinfo: TransferInfo, season_episode: Optional[str] = None, + username: Optional[str] = None): """ 发送入库成功的消息 """ diff --git a/app/chain/tvdb.py b/app/chain/tvdb.py index 2e1b3dae..d6384224 100644 --- a/app/chain/tvdb.py +++ b/app/chain/tvdb.py @@ -1,9 +1,9 @@ from typing import List + from app.chain import ChainBase -from app.utils.singleton import Singleton -class TvdbChain(ChainBase, metaclass=Singleton): +class TvdbChain(ChainBase): """ Tvdb处理链,单例运行 """ diff --git a/app/chain/user.py b/app/chain/user.py index 9afd354c..331b1836 100644 --- a/app/chain/user.py +++ b/app/chain/user.py @@ -10,20 +10,15 @@ from app.log import logger from app.schemas import AuthCredentials, AuthInterceptCredentials from app.schemas.types import ChainEventType from app.utils.otp import OtpUtils -from app.utils.singleton import Singleton PASSWORD_INVALID_CREDENTIALS_MESSAGE = "用户名或密码或二次校验码不正确" -class UserChain(ChainBase, metaclass=Singleton): +class UserChain(ChainBase): """ 用户链,处理多种认证协议 """ - def __init__(self): - super().__init__() - self.user_oper = UserOper() - def user_authenticate( self, username: Optional[str] = None, @@ -90,7 +85,8 @@ class UserChain(ChainBase, metaclass=Singleton): logger.debug(f"辅助认证未启用,认证类型 {grant_type} 未实现") return False, "不支持的认证类型" - def password_authenticate(self, credentials: AuthCredentials) -> Tuple[bool, Union[User, str]]: + @staticmethod + def password_authenticate(credentials: AuthCredentials) -> Tuple[bool, Union[User, str]]: """ 密码认证 @@ -103,7 +99,7 @@ class UserChain(ChainBase, metaclass=Singleton): logger.info("密码认证失败,认证类型不匹配") return False, PASSWORD_INVALID_CREDENTIALS_MESSAGE - user = self.user_oper.get_by_name(name=credentials.username) + user = UserOper().get_by_name(name=credentials.username) if not user: logger.info(f"密码认证失败,用户 {credentials.username} 不存在") return False, PASSWORD_INVALID_CREDENTIALS_MESSAGE @@ -131,8 +127,9 @@ class UserChain(ChainBase, metaclass=Singleton): return False, "认证凭证无效" # 检查是否因为用户被禁用 + useroper = UserOper() if credentials.username: - user = self.user_oper.get_by_name(name=credentials.username) + user = useroper.get_by_name(name=credentials.username) if user and not user.is_active: logger.info(f"用户 {user.name} 已被禁用,跳过后续身份校验") return False, PASSWORD_INVALID_CREDENTIALS_MESSAGE @@ -156,7 +153,7 @@ class UserChain(ChainBase, metaclass=Singleton): success = self._process_auth_success(username=credentials.username, credentials=credentials) if success: logger.info(f"用户 {credentials.username} 辅助认证通过") - return True, self.user_oper.get_by_name(credentials.username) + return True, useroper.get_by_name(credentials.username) else: logger.warning(f"用户 {credentials.username} 辅助认证未通过") return False, PASSWORD_INVALID_CREDENTIALS_MESSAGE @@ -213,7 +210,8 @@ class UserChain(ChainBase, metaclass=Singleton): return False # 检查用户是否存在,如果不存在且当前为密码认证时则创建新用户 - user = self.user_oper.get_by_name(name=username) + useroper = UserOper() + user = useroper.get_by_name(name=username) if user: # 如果用户存在,但是已经被禁用,则直接响应 if not user.is_active: @@ -226,8 +224,8 @@ class UserChain(ChainBase, metaclass=Singleton): return True else: if credentials.grant_type == "password": - self.user_oper.add(name=username, is_active=True, is_superuser=False, - hashed_password=get_password_hash(secrets.token_urlsafe(16))) + useroper.add(name=username, is_active=True, is_superuser=False, + hashed_password=get_password_hash(secrets.token_urlsafe(16))) logger.info(f"用户 {username} 不存在,已通过 {credentials.grant_type} 认证并已创建普通用户") return True else: diff --git a/app/chain/webhook.py b/app/chain/webhook.py index a158143b..cd467b5d 100644 --- a/app/chain/webhook.py +++ b/app/chain/webhook.py @@ -2,10 +2,9 @@ from typing import Any from app.chain import ChainBase from app.schemas.types import EventType -from app.utils.singleton import Singleton -class WebhookChain(ChainBase, metaclass=Singleton): +class WebhookChain(ChainBase): """ Webhook处理链 """ diff --git a/app/chain/workflow.py b/app/chain/workflow.py index 175ff59a..200524c7 100644 --- a/app/chain/workflow.py +++ b/app/chain/workflow.py @@ -188,16 +188,14 @@ class WorkflowChain(ChainBase): 工作流链 """ - def __init__(self): - super().__init__() - self.workflowoper = WorkflowOper() - - def process(self, workflow_id: int, from_begin: Optional[bool] = True) -> Tuple[bool, str]: + @staticmethod + def process(workflow_id: int, from_begin: Optional[bool] = True) -> Tuple[bool, str]: """ 处理工作流 :param workflow_id: 工作流ID :param from_begin: 是否从头开始,默认为True """ + workflowoper = WorkflowOper() def save_step(action: Action, context: ActionContext): """ @@ -207,16 +205,16 @@ class WorkflowChain(ChainBase): serialized_data = pickle.dumps(context) # 使用Base64编码字节流 encoded_data = base64.b64encode(serialized_data).decode('utf-8') - self.workflowoper.step(workflow_id, action_id=action.id, context={ + workflowoper.step(workflow_id, action_id=action.id, context={ "content": encoded_data }) # 重置工作流 if from_begin: - self.workflowoper.reset(workflow_id) + workflowoper.reset(workflow_id) # 查询工作流数据 - workflow = self.workflowoper.get(workflow_id) + workflow = workflowoper.get(workflow_id) if not workflow: logger.warn(f"工作流 {workflow_id} 不存在") return False, "工作流不存在" @@ -228,7 +226,7 @@ class WorkflowChain(ChainBase): return False, "工作流无流程" logger.info(f"开始处理 {workflow.name},共 {len(workflow.actions)} 个动作 ...") - self.workflowoper.start(workflow_id) + workflowoper.start(workflow_id) # 执行工作流 executor = WorkflowExecutor(workflow, step_callback=save_step) @@ -236,15 +234,16 @@ class WorkflowChain(ChainBase): if not executor.success: logger.info(f"工作流 {workflow.name} 执行失败:{executor.errmsg}") - self.workflowoper.fail(workflow_id, result=executor.errmsg) + workflowoper.fail(workflow_id, result=executor.errmsg) return False, executor.errmsg else: logger.info(f"工作流 {workflow.name} 执行完成") - self.workflowoper.success(workflow_id) + workflowoper.success(workflow_id) return True, "" - def get_workflows(self) -> List[Workflow]: + @staticmethod + def get_workflows() -> List[Workflow]: """ 获取工作流列表 """ - return self.workflowoper.list_enabled() + return WorkflowOper().list_enabled() diff --git a/app/helper/message.py b/app/helper/message.py index 43d6cc38..687de919 100644 --- a/app/helper/message.py +++ b/app/helper/message.py @@ -61,7 +61,8 @@ class TemplateContextBuilder: self._add_transfer_info(transferinfo) self._add_torrent_info(torrentinfo) self._add_file_info(file_extension) - if kwargs: self._context.update(kwargs) + if kwargs: + self._context.update(kwargs) if include_raw_objects: self._add_raw_objects(meta, mediainfo, torrentinfo, transferinfo, episodes_info) @@ -73,7 +74,8 @@ class TemplateContextBuilder: """ 增加媒体信息 """ - if not mediainfo: return + if not mediainfo: + return season_fmt = f"S{mediainfo.season:02d}" if mediainfo.season is not None else None base_info = { # 标题 @@ -245,7 +247,8 @@ class TemplateContextBuilder: """ 添加文件信息 """ - if not file_extension: return + if not file_extension: + return file_info = { # 文件后缀 "fileExt": file_extension, diff --git a/app/helper/wallpaper.py b/app/helper/wallpaper.py index adacbe27..3403c03e 100644 --- a/app/helper/wallpaper.py +++ b/app/helper/wallpaper.py @@ -1,5 +1,7 @@ from typing import Optional, List +from app.chain.mediaserver import MediaServerChain +from app.chain.tmdb import TmdbChain from app.core.cache import cached from app.core.config import settings from app.utils.http import RequestUtils @@ -11,6 +13,49 @@ class WallpaperHelper(metaclass=Singleton): def __init__(self): self.req = RequestUtils(timeout=5) + @staticmethod + def get_wallpaper() -> Optional[str]: + """ + 获取登录页面壁纸 + """ + if settings.WALLPAPER == "bing": + url = WallpaperHelper().get_bing_wallpaper() + elif settings.WALLPAPER == "mediaserver": + url = MediaServerChain().get_latest_wallpaper() + elif settings.WALLPAPER == "customize": + url = WallpaperHelper().get_customize_wallpaper() + else: + url = WallpaperHelper().get_tmdb_wallpaper() + return url + + @staticmethod + def get_wallpapers(num: int = 10) -> List[str]: + """ + 获取登录页面壁纸列表 + """ + if settings.WALLPAPER == "bing": + return WallpaperHelper().get_bing_wallpapers(num) + elif settings.WALLPAPER == "mediaserver": + return MediaServerChain().get_latest_wallpapers(count=num) + elif settings.WALLPAPER == "customize": + return WallpaperHelper().get_customize_wallpapers(num) + else: + return WallpaperHelper().get_tmdb_wallpapers(num) + + @cached(maxsize=1, ttl=3600) + def get_tmdb_wallpaper(self) -> Optional[str]: + """ + 获取TMDB每日壁纸 + """ + return TmdbChain().get_random_wallpager() + + @cached(maxsize=1, ttl=3600) + def get_tmdb_wallpapers(self, num: int = 10) -> List[str]: + """ + 获取7天的TMDB每日壁纸 + """ + return TmdbChain().get_trending_wallpapers(num) + @cached(maxsize=1, ttl=3600) def get_bing_wallpaper(self) -> Optional[str]: """ diff --git a/app/startup/lifecycle.py b/app/startup/lifecycle.py index a3ae1d2b..079d66b2 100644 --- a/app/startup/lifecycle.py +++ b/app/startup/lifecycle.py @@ -3,6 +3,7 @@ from contextlib import asynccontextmanager from fastapi import FastAPI +from app.chain.system import SystemChain from app.core.config import global_vars from app.startup.command_initializer import init_command, stop_command, restart_command from app.startup.memory_initializer import init_memory_manager, stop_memory_manager @@ -23,6 +24,8 @@ async def init_plugin_system(): init_plugin_scheduler() # 重新注册命令 restart_command() + # 重启完成 + SystemChain().restart_finish() @asynccontextmanager