fix:优化单例模式和类引用

This commit is contained in:
jxxghp
2025-06-05 13:22:16 +08:00
parent 504827b7e5
commit a9300faaf8
25 changed files with 511 additions and 552 deletions

View File

@@ -5,9 +5,7 @@ from fastapi import APIRouter, Depends, Form, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from app import schemas
from app.chain.tmdb import TmdbChain
from app.chain.user import UserChain
from app.chain.mediaserver import MediaServerChain
from app.core import security
from app.core.config import settings
from app.helper.sites import SitesHelper
@@ -54,14 +52,7 @@ def wallpaper() -> Any:
"""
获取登录页面电影海报
"""
if settings.WALLPAPER == "bing":
url = WallpaperHelper().get_bing_wallpaper()
elif settings.WALLPAPER == "mediaserver":
url = MediaServerChain().get_latest_wallpaper()
elif settings.WALLPAPER == "customize":
url = WallpaperHelper().get_customize_wallpaper()
else:
url = TmdbChain().get_random_wallpager()
url = WallpaperHelper().get_wallpaper()
if url:
return schemas.Response(
success=True,
@@ -75,13 +66,4 @@ def wallpapers() -> Any:
"""
获取登录页面电影海报
"""
if settings.WALLPAPER == "bing":
return WallpaperHelper().get_bing_wallpapers()
elif settings.WALLPAPER == "mediaserver":
return MediaServerChain().get_latest_wallpapers()
elif settings.WALLPAPER == "tmdb":
return TmdbChain().get_trending_wallpapers()
elif settings.WALLPAPER == "customize":
return WallpaperHelper().get_customize_wallpapers()
else:
return []
return WallpaperHelper().get_wallpaper()

View File

@@ -43,7 +43,6 @@ class ChainBase(metaclass=ABCMeta):
self.messagequeue = MessageQueueManager(
send_callback=self.run_module
)
self.useroper = UserOper()
self.pluginmanager = PluginManager()
@staticmethod
@@ -575,26 +574,27 @@ class ChainBase(metaclass=ABCMeta):
# 是否已发送管理员标志
admin_sended = False
send_orignal = False
useroper = UserOper()
for action in actions:
send_message = copy.deepcopy(message)
if action == "admin" and not admin_sended:
# 仅发送管理员
logger.info(f"{send_message.mtype} 的消息已设置发送给管理员")
# 读取管理员消息IDS
send_message.targets = self.useroper.get_settings(settings.SUPERUSER)
send_message.targets = useroper.get_settings(settings.SUPERUSER)
admin_sended = True
elif action == "user" and send_message.username:
# 发送对应用户
logger.info(f"{send_message.mtype} 的消息已设置发送给用户 {send_message.username}")
# 读取用户消息IDS
send_message.targets = self.useroper.get_settings(send_message.username)
send_message.targets = useroper.get_settings(send_message.username)
if send_message.targets is None:
# 没有找到用户
if not admin_sended:
# 回滚发送管理员
logger.info(f"用户 {send_message.username} 不存在,消息将发送给管理员")
# 读取管理员消息IDS
send_message.targets = self.useroper.get_settings(settings.SUPERUSER)
send_message.targets = useroper.get_settings(settings.SUPERUSER)
admin_sended = True
else:
# 管理员发过了,此消息不发了

View File

@@ -3,12 +3,11 @@ from typing import Optional, List
from app import schemas
from app.chain import ChainBase
from app.core.context import MediaInfo
from app.utils.singleton import Singleton
class BangumiChain(ChainBase, metaclass=Singleton):
class BangumiChain(ChainBase):
"""
Bangumi处理链,单例运行
Bangumi处理链
"""
def calendar(self) -> Optional[List[MediaInfo]]:

View File

@@ -2,10 +2,9 @@ from typing import Optional, List
from app import schemas
from app.chain import ChainBase
from app.utils.singleton import Singleton
class DashboardChain(ChainBase, metaclass=Singleton):
class DashboardChain(ChainBase):
"""
各类仪表板统计处理链
"""

View File

@@ -4,12 +4,11 @@ from app import schemas
from app.chain import ChainBase
from app.core.context import MediaInfo
from app.schemas import MediaType
from app.utils.singleton import Singleton
class DoubanChain(ChainBase, metaclass=Singleton):
class DoubanChain(ChainBase):
"""
豆瓣处理链,单例运行
豆瓣处理链
"""
def person_detail(self, person_id: int) -> Optional[schemas.MediaPerson]:

View File

@@ -16,7 +16,6 @@ from app.core.metainfo import MetaInfo
from app.db.downloadhistory_oper import DownloadHistoryOper
from app.db.mediaserver_oper import MediaServerOper
from app.helper.directory import DirectoryHelper
from app.helper.message import MessageHelper
from app.helper.torrent import TorrentHelper
from app.log import logger
from app.schemas import ExistMediaInfo, NotExistMediaInfo, DownloadingTorrent, Notification, ResourceSelectionEventData, \
@@ -32,14 +31,6 @@ class DownloadChain(ChainBase):
下载处理链
"""
def __init__(self):
super().__init__()
self.torrent = TorrentHelper()
self.downloadhis = DownloadHistoryOper()
self.mediaserver = MediaServerOper()
self.directoryhelper = DirectoryHelper()
self.messagehelper = MessageHelper()
def download_torrent(self, torrent: TorrentInfo,
channel: MessageChannel = None,
source: Optional[str] = None,
@@ -122,7 +113,7 @@ class DownloadChain(ChainBase):
logger.error(f"{torrent.title} 无法获取下载地址:{torrent.enclosure}")
return None, "", []
# 下载种子文件
torrent_file, content, download_folder, files, error_msg = self.torrent.download_torrent(
torrent_file, content, download_folder, files, error_msg = TorrentHelper().download_torrent(
url=torrent_url,
cookie=site_cookie,
ua=torrent.site_ua or settings.USER_AGENT,
@@ -220,7 +211,7 @@ class DownloadChain(ChainBase):
else:
content = torrent_file
# 获取种子文件的文件夹名和文件清单
_folder_name, _file_list = self.torrent.get_torrent_info(torrent_file)
_folder_name, _file_list = TorrentHelper().get_torrent_info(torrent_file)
# 下载目录
if save_path:
@@ -228,7 +219,7 @@ class DownloadChain(ChainBase):
download_dir = Path(save_path)
else:
# 根据媒体信息查询下载目录配置
dir_info = self.directoryhelper.get_dir(_media, storage="local", include_unsorted=True)
dir_info = DirectoryHelper().get_dir(_media, storage="local", include_unsorted=True)
# 拼装子目录
if dir_info:
# 一级目录
@@ -278,7 +269,8 @@ class DownloadChain(ChainBase):
_save_path = download_dir if _layout == "NoSubfolder" or not _folder_name else download_path
# 登记下载记录
self.downloadhis.add(
downloadhis = DownloadHistoryOper()
downloadhis.add(
path=str(download_path),
type=_media.type.value,
title=_media.title,
@@ -326,7 +318,7 @@ class DownloadChain(ChainBase):
"torrentname": _meta.org_string,
})
if files_to_add:
self.downloadhis.add_files(files_to_add)
downloadhis.add_files(files_to_add)
# 下载成功发送消息
self.post_message(
@@ -553,7 +545,7 @@ class DownloadChain(ChainBase):
if isinstance(content, str):
logger.warn(f"{meta.org_string} 下载地址是磁力链,无法确定种子文件集数")
continue
torrent_episodes = self.torrent.get_torrent_episodes(torrent_files)
torrent_episodes = TorrentHelper().get_torrent_episodes(torrent_files)
logger.info(f"{meta.org_string} 解析种子文件集数为 {torrent_episodes}")
if not torrent_episodes:
continue
@@ -759,7 +751,7 @@ class DownloadChain(ChainBase):
logger.warn(f"{meta.org_string} 下载地址是磁力链,无法解析种子文件集数")
continue
# 种子全部集
torrent_episodes = self.torrent.get_torrent_episodes(torrent_files)
torrent_episodes = TorrentHelper().get_torrent_episodes(torrent_files)
logger.info(f"{torrent.site_name} - {meta.org_string} 解析种子文件集数:{torrent_episodes}")
# 选中的集
selected_episodes = set(torrent_episodes).intersection(set(need_episodes))
@@ -848,11 +840,13 @@ class DownloadChain(ChainBase):
if not totals:
totals = {}
mediaserver = MediaServerOper()
if mediainfo.type == MediaType.MOVIE:
# 电影
itemid = self.mediaserver.get_item_id(mtype=mediainfo.type.value,
title=mediainfo.title,
tmdbid=mediainfo.tmdb_id)
itemid = mediaserver.get_item_id(mtype=mediainfo.type.value,
title=mediainfo.title,
tmdbid=mediainfo.tmdb_id)
exists_movies: Optional[ExistMediaInfo] = self.media_exists(mediainfo=mediainfo, itemid=itemid)
if exists_movies:
logger.info(f"媒体库中已存在电影:{mediainfo.title_year}")
@@ -872,10 +866,10 @@ class DownloadChain(ChainBase):
logger.error(f"媒体信息中没有季集信息:{mediainfo.title_year}")
return False, {}
# 电视剧
itemid = self.mediaserver.get_item_id(mtype=mediainfo.type.value,
title=mediainfo.title,
tmdbid=mediainfo.tmdb_id,
season=mediainfo.season)
itemid = mediaserver.get_item_id(mtype=mediainfo.type.value,
title=mediainfo.title,
tmdbid=mediainfo.tmdb_id,
season=mediainfo.season)
# 媒体库已存在的剧集
exists_tvs: Optional[ExistMediaInfo] = self.media_exists(mediainfo=mediainfo, itemid=itemid)
if not exists_tvs:
@@ -974,7 +968,7 @@ class DownloadChain(ChainBase):
return []
ret_torrents = []
for torrent in torrents:
history = self.downloadhis.get_by_hash(torrent.hash)
history = DownloadHistoryOper().get_by_hash(torrent.hash)
if history:
# 媒体信息
torrent.media = {

View File

@@ -14,7 +14,6 @@ from app.log import logger
from app.schemas import FileItem
from app.schemas.types import EventType, MediaType, ChainEventType
from app.utils.http import RequestUtils
from app.utils.singleton import Singleton
from app.utils.string import StringUtils
recognize_lock = Lock()
@@ -22,15 +21,11 @@ scraping_lock = Lock()
scraping_files = []
class MediaChain(ChainBase, metaclass=Singleton):
class MediaChain(ChainBase):
"""
媒体信息处理链,单例运行
"""
def __init__(self):
super().__init__()
self.storagechain = StorageChain()
def metadata_nfo(self, meta: MetaBase, mediainfo: MediaInfo,
season: Optional[int] = None, episode: Optional[int] = None) -> Optional[str]:
"""
@@ -337,6 +332,8 @@ class MediaChain(ChainBase, metaclass=Singleton):
:param overwrite: 是否覆盖已有文件
"""
storagechain = StorageChain()
def is_bluray_folder(_fileitem: schemas.FileItem) -> bool:
"""
判断是否为原盘目录
@@ -346,7 +343,7 @@ class MediaChain(ChainBase, metaclass=Singleton):
# 蓝光原盘目录必备的文件或文件夹
required_files = ['BDMV', 'CERTIFICATE']
# 检查目录下是否存在所需文件或文件夹
for item in self.storagechain.list_files(_fileitem):
for item in storagechain.list_files(_fileitem):
if item.name in required_files:
return True
return False
@@ -355,7 +352,7 @@ class MediaChain(ChainBase, metaclass=Singleton):
"""
列出下级文件
"""
return self.storagechain.list_files(fileitem=_fileitem)
return storagechain.list_files(fileitem=_fileitem)
def __save_file(_fileitem: schemas.FileItem, _path: Path, _content: Union[bytes, str]):
"""
@@ -371,7 +368,7 @@ class MediaChain(ChainBase, metaclass=Singleton):
tmp_file.write_bytes(_content)
# 获取文件的父目录
try:
item = self.storagechain.upload_file(fileitem=_fileitem, path=tmp_file, new_name=_path.name)
item = storagechain.upload_file(fileitem=_fileitem, path=tmp_file, new_name=_path.name)
if item:
logger.info(f"已保存文件:{item.path}")
else:
@@ -413,7 +410,7 @@ class MediaChain(ChainBase, metaclass=Singleton):
if fileitem.type == "file":
# 是否已存在
nfo_path = filepath.with_suffix(".nfo")
if overwrite or not self.storagechain.get_file_item(storage=fileitem.storage, path=nfo_path):
if overwrite or not storagechain.get_file_item(storage=fileitem.storage, path=nfo_path):
# 电影文件
movie_nfo = self.metadata_nfo(meta=meta, mediainfo=mediainfo)
if movie_nfo:
@@ -428,7 +425,7 @@ class MediaChain(ChainBase, metaclass=Singleton):
if is_bluray_folder(fileitem):
# 原盘目录
nfo_path = filepath / (filepath.name + ".nfo")
if overwrite or not self.storagechain.get_file_item(storage=fileitem.storage, path=nfo_path):
if overwrite or not storagechain.get_file_item(storage=fileitem.storage, path=nfo_path):
# 生成原盘nfo
movie_nfo = self.metadata_nfo(meta=meta, mediainfo=mediainfo)
if movie_nfo:
@@ -453,8 +450,8 @@ class MediaChain(ChainBase, metaclass=Singleton):
if image_dict:
for image_name, image_url in image_dict.items():
image_path = filepath.with_name(image_name)
if overwrite or not self.storagechain.get_file_item(storage=fileitem.storage,
path=image_path):
if overwrite or not storagechain.get_file_item(storage=fileitem.storage,
path=image_path):
# 下载图片
content = __download_image(image_url)
# 写入图片到当前目录
@@ -477,7 +474,7 @@ class MediaChain(ChainBase, metaclass=Singleton):
return
# 是否已存在
nfo_path = filepath.with_suffix(".nfo")
if overwrite or not self.storagechain.get_file_item(storage=fileitem.storage, path=nfo_path):
if overwrite or not storagechain.get_file_item(storage=fileitem.storage, path=nfo_path):
# 获取集的nfo文件
episode_nfo = self.metadata_nfo(meta=file_meta, mediainfo=file_mediainfo,
season=file_meta.begin_season,
@@ -485,7 +482,7 @@ class MediaChain(ChainBase, metaclass=Singleton):
if episode_nfo:
# 保存或上传nfo文件到上级目录
if not parent:
parent = self.storagechain.get_parent_item(fileitem)
parent = storagechain.get_parent_item(fileitem)
__save_file(_fileitem=parent, _path=nfo_path, _content=episode_nfo)
else:
logger.warn(f"{filepath.name} nfo文件生成失败")
@@ -497,13 +494,13 @@ class MediaChain(ChainBase, metaclass=Singleton):
if image_dict:
for episode, image_url in image_dict.items():
image_path = filepath.with_suffix(Path(image_url).suffix)
if overwrite or not self.storagechain.get_file_item(storage=fileitem.storage, path=image_path):
if overwrite or not storagechain.get_file_item(storage=fileitem.storage, path=image_path):
# 下载图片
content = __download_image(image_url)
# 保存图片文件到当前目录
if content:
if not parent:
parent = self.storagechain.get_parent_item(fileitem)
parent = storagechain.get_parent_item(fileitem)
__save_file(_fileitem=parent, _path=image_path, _content=content)
else:
logger.info(f"已存在图片文件:{image_path}")
@@ -526,7 +523,7 @@ class MediaChain(ChainBase, metaclass=Singleton):
if season_meta.begin_season is not None:
# 是否已存在
nfo_path = filepath / "season.nfo"
if overwrite or not self.storagechain.get_file_item(storage=fileitem.storage, path=nfo_path):
if overwrite or not storagechain.get_file_item(storage=fileitem.storage, path=nfo_path):
# 当前目录有季号生成季nfo
season_nfo = self.metadata_nfo(meta=meta, mediainfo=mediainfo,
season=season_meta.begin_season)
@@ -542,14 +539,14 @@ class MediaChain(ChainBase, metaclass=Singleton):
if image_dict:
for image_name, image_url in image_dict.items():
image_path = filepath.with_name(image_name)
if overwrite or not self.storagechain.get_file_item(storage=fileitem.storage,
path=image_path):
if overwrite or not storagechain.get_file_item(storage=fileitem.storage,
path=image_path):
# 下载图片
content = __download_image(image_url)
# 保存图片文件到剧集目录
if content:
if not parent:
parent = self.storagechain.get_parent_item(fileitem)
parent = storagechain.get_parent_item(fileitem)
__save_file(_fileitem=parent, _path=image_path, _content=content)
else:
logger.info(f"已存在图片文件:{image_path}")
@@ -564,14 +561,14 @@ class MediaChain(ChainBase, metaclass=Singleton):
if image_season != str(season_meta.begin_season).rjust(2, '0'):
logger.info(f"当前刮削季为:{season_meta.begin_season},跳过文件:{image_path}")
continue
if overwrite or not self.storagechain.get_file_item(storage=fileitem.storage,
path=image_path):
if overwrite or not storagechain.get_file_item(storage=fileitem.storage,
path=image_path):
# 下载图片
content = __download_image(image_url)
# 保存图片文件到当前目录
if content:
if not parent:
parent = self.storagechain.get_parent_item(fileitem)
parent = storagechain.get_parent_item(fileitem)
__save_file(_fileitem=parent, _path=image_path, _content=content)
else:
logger.info(f"已存在图片文件:{image_path}")
@@ -579,7 +576,7 @@ class MediaChain(ChainBase, metaclass=Singleton):
if not season_meta.season:
# 是否已存在
nfo_path = filepath / "tvshow.nfo"
if overwrite or not self.storagechain.get_file_item(storage=fileitem.storage, path=nfo_path):
if overwrite or not storagechain.get_file_item(storage=fileitem.storage, path=nfo_path):
# 当前目录有名称生成tvshow nfo 和 tv图片
tv_nfo = self.metadata_nfo(meta=meta, mediainfo=mediainfo)
if tv_nfo:
@@ -597,8 +594,8 @@ class MediaChain(ChainBase, metaclass=Singleton):
if image_name.startswith("season"):
continue
image_path = filepath / image_name
if overwrite or not self.storagechain.get_file_item(storage=fileitem.storage,
path=image_path):
if overwrite or not storagechain.get_file_item(storage=fileitem.storage,
path=image_path):
# 下载图片
content = __download_image(image_url)
# 保存图片文件到当前目录

View File

@@ -17,10 +17,6 @@ class MediaServerChain(ChainBase):
媒体服务器处理链
"""
def __init__(self):
super().__init__()
self.dboper = MediaServerOper()
def librarys(self, server: str, username: Optional[str] = None,
hidden: bool = False) -> List[MediaServerLibrary]:
"""
@@ -131,7 +127,8 @@ class MediaServerChain(ChainBase):
# 汇总统计
total_count = 0
# 清空登记薄
self.dboper.empty()
dboper = MediaServerOper()
dboper.empty()
# 遍历媒体服务器
for mediaserver in mediaservers:
if not mediaserver:
@@ -175,7 +172,7 @@ class MediaServerChain(ChainBase):
item_dict = item.dict()
item_dict["seasoninfo"] = seasoninfo
item_dict["item_type"] = item_type
self.dboper.add(**item_dict)
dboper.add(**item_dict)
logger.info(f"{server_name} 媒体库 {library.name} 同步完成,共同步数量:{library_count}")
# 总数累加
total_count += library_count

View File

@@ -9,10 +9,8 @@ from app.chain.search import SearchChain
from app.chain.subscribe import SubscribeChain
from app.core.config import settings
from app.core.context import MediaInfo, Context
from app.core.event import EventManager
from app.core.meta import MetaBase
from app.db.message_oper import MessageOper
from app.helper.message import MessageHelper
from app.db.user_oper import UserOper
from app.helper.torrent import TorrentHelper
from app.log import logger
from app.schemas import Notification, NotExistMediaInfo, CommingMessage
@@ -36,19 +34,8 @@ class MessageChain(ChainBase):
# 每页数据量
_page_size: int = 8
def __init__(self):
super().__init__()
self.downloadchain = DownloadChain()
self.subscribechain = SubscribeChain()
self.searchchain = SearchChain()
self.mediachain = MediaChain()
self.eventmanager = EventManager()
self.torrenthelper = TorrentHelper()
self.messagehelper = MessageHelper()
self.messageoper = MessageOper()
@staticmethod
def __get_noexits_info(
self,
_meta: MetaBase,
_mediainfo: MediaInfo) -> Dict[Union[int, str], Dict[int, NotExistMediaInfo]]:
"""
@@ -57,10 +44,10 @@ class MessageChain(ChainBase):
if _mediainfo.type == MediaType.TV:
if not _mediainfo.seasons:
# 补充媒体信息
_mediainfo = self.mediachain.recognize_media(mtype=_mediainfo.type,
tmdbid=_mediainfo.tmdb_id,
doubanid=_mediainfo.douban_id,
cache=False)
_mediainfo = MediaChain().recognize_media(mtype=_mediainfo.type,
tmdbid=_mediainfo.tmdb_id,
doubanid=_mediainfo.douban_id,
cache=False)
if not _mediainfo:
logger.warn(f"{_mediainfo.tmdb_id or _mediainfo.douban_id} 媒体信息识别失败!")
return {}
@@ -193,8 +180,8 @@ class MessageChain(ChainBase):
mediainfo: MediaInfo = cache_list[_choice]
_current_media = mediainfo
# 查询缺失的媒体信息
exist_flag, no_exists = self.downloadchain.get_no_exists_info(meta=_current_meta,
mediainfo=_current_media)
exist_flag, no_exists = DownloadChain().get_no_exists_info(meta=_current_meta,
mediainfo=_current_media)
if exist_flag and cache_type == "Search":
# 媒体库中已存在
self.post_message(
@@ -234,8 +221,8 @@ class MessageChain(ChainBase):
title=f"开始搜索 {mediainfo.type.value} {mediainfo.title_year} ...",
userid=userid))
# 开始搜索
contexts = self.searchchain.process(mediainfo=mediainfo,
no_exists=no_exists)
contexts = SearchChain().process(mediainfo=mediainfo,
no_exists=no_exists)
if not contexts:
# 没有数据
self.post_message(Notification(
@@ -246,7 +233,7 @@ class MessageChain(ChainBase):
userid=userid))
return
# 搜索结果排序
contexts = self.torrenthelper.sort_torrents(contexts)
contexts = TorrentHelper().sort_torrents(contexts)
# 判断是否设置自动下载
auto_download_user = settings.AUTO_DOWNLOAD_USER
# 匹配到自动下载用户
@@ -287,8 +274,8 @@ class MessageChain(ChainBase):
best_version = False
# 查询缺失的媒体信息
if cache_type == "Subscribe":
exist_flag, _ = self.downloadchain.get_no_exists_info(meta=_current_meta,
mediainfo=mediainfo)
exist_flag, _ = DownloadChain().get_no_exists_info(meta=_current_meta,
mediainfo=mediainfo)
if exist_flag:
self.post_message(Notification(
channel=channel,
@@ -300,18 +287,18 @@ class MessageChain(ChainBase):
else:
best_version = True
# 转换用户名
mp_name = self.useroper.get_name(**{f"{channel.name.lower()}_userid": userid}) if channel else None
mp_name = UserOper().get_name(**{f"{channel.name.lower()}_userid": userid}) if channel else None
# 添加订阅状态为N
self.subscribechain.add(title=mediainfo.title,
year=mediainfo.year,
mtype=mediainfo.type,
tmdbid=mediainfo.tmdb_id,
season=_current_meta.begin_season,
channel=channel,
source=source,
userid=userid,
username=mp_name or username,
best_version=best_version)
SubscribeChain().add(title=mediainfo.title,
year=mediainfo.year,
mtype=mediainfo.type,
tmdbid=mediainfo.tmdb_id,
season=_current_meta.begin_season,
channel=channel,
source=source,
userid=userid,
username=mp_name or username,
best_version=best_version)
elif cache_type == "Torrent":
if int(text) == 0:
# 自动选择下载,强制下载模式
@@ -324,8 +311,8 @@ class MessageChain(ChainBase):
# 下载种子
context: Context = cache_list[_choice]
# 下载
self.downloadchain.download_single(context, channel=channel, source=source,
userid=userid, username=username)
DownloadChain().download_single(context, channel=channel, source=source,
userid=userid, username=username)
elif text.lower() == "p":
# 上一页
@@ -444,7 +431,7 @@ class MessageChain(ChainBase):
if action in ["Search", "ReSearch", "Subscribe", "ReSubscribe"]:
# 搜索
meta, medias = self.mediachain.search(content)
meta, medias = MediaChain().search(content)
# 识别
if not meta.name:
self.post_message(Notification(
@@ -497,9 +484,10 @@ class MessageChain(ChainBase):
"""
自动择优下载
"""
downloadchain = DownloadChain()
if no_exists is None:
# 查询缺失的媒体信息
exist_flag, no_exists = self.downloadchain.get_no_exists_info(
exist_flag, no_exists = downloadchain.get_no_exists_info(
meta=_current_meta,
mediainfo=_current_media
)
@@ -508,12 +496,12 @@ class MessageChain(ChainBase):
no_exists = self.__get_noexits_info(_current_meta, _current_media)
# 批量下载
downloads, lefts = self.downloadchain.batch_download(contexts=cache_list,
no_exists=no_exists,
channel=channel,
source=source,
userid=userid,
username=username)
downloads, lefts = downloadchain.batch_download(contexts=cache_list,
no_exists=no_exists,
channel=channel,
source=source,
userid=userid,
username=username)
if downloads and not lefts:
# 全部下载完成
logger.info(f'{_current_media.title_year} 下载完成')
@@ -528,19 +516,19 @@ class MessageChain(ChainBase):
else:
note = None
# 转换用户名
mp_name = self.useroper.get_name(**{f"{channel.name.lower()}_userid": userid}) if channel else None
mp_name = UserOper().get_name(**{f"{channel.name.lower()}_userid": userid}) if channel else None
# 添加订阅状态为R
self.subscribechain.add(title=_current_media.title,
year=_current_media.year,
mtype=_current_media.type,
tmdbid=_current_media.tmdb_id,
season=_current_meta.begin_season,
channel=channel,
source=source,
userid=userid,
username=mp_name or username,
state="R",
note=note)
SubscribeChain().add(title=_current_media.title,
year=_current_media.year,
mtype=_current_media.type,
tmdbid=_current_media.tmdb_id,
season=_current_meta.begin_season,
channel=channel,
source=source,
userid=userid,
username=mp_name or username,
state="R",
note=note)
def __post_medias_message(self, channel: MessageChannel, source: str,
title: str, items: list, userid: str, total: int):

View File

@@ -29,12 +29,8 @@ class RecommendChain(ChainBase, metaclass=Singleton):
推荐处理链,单例运行
"""
def __init__(self):
super().__init__()
self.tmdbchain = TmdbChain()
self.doubanchain = DoubanChain()
self.bangumichain = BangumiChain()
self.cache_max_pages = 5
# 推荐数据的缓存页数
cache_max_pages = 5
def refresh_recommend(self):
"""
@@ -174,16 +170,16 @@ class RecommendChain(ChainBase, metaclass=Singleton):
"""
TMDB热门电影
"""
movies = self.tmdbchain.tmdb_discover(mtype=MediaType.MOVIE,
sort_by=sort_by,
with_genres=with_genres,
with_original_language=with_original_language,
with_keywords=with_keywords,
with_watch_providers=with_watch_providers,
vote_average=vote_average,
vote_count=vote_count,
release_date=release_date,
page=page)
movies = TmdbChain().tmdb_discover(mtype=MediaType.MOVIE,
sort_by=sort_by,
with_genres=with_genres,
with_original_language=with_original_language,
with_keywords=with_keywords,
with_watch_providers=with_watch_providers,
vote_average=vote_average,
vote_count=vote_count,
release_date=release_date,
page=page)
return [movie.to_dict() for movie in movies] if movies else []
@log_execution_time(logger=logger)
@@ -200,16 +196,16 @@ class RecommendChain(ChainBase, metaclass=Singleton):
"""
TMDB热门电视剧
"""
tvs = self.tmdbchain.tmdb_discover(mtype=MediaType.TV,
sort_by=sort_by,
with_genres=with_genres,
with_original_language=with_original_language,
with_keywords=with_keywords,
with_watch_providers=with_watch_providers,
vote_average=vote_average,
vote_count=vote_count,
release_date=release_date,
page=page)
tvs = TmdbChain().tmdb_discover(mtype=MediaType.TV,
sort_by=sort_by,
with_genres=with_genres,
with_original_language=with_original_language,
with_keywords=with_keywords,
with_watch_providers=with_watch_providers,
vote_average=vote_average,
vote_count=vote_count,
release_date=release_date,
page=page)
return [tv.to_dict() for tv in tvs] if tvs else []
@log_execution_time(logger=logger)
@@ -218,7 +214,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
"""
TMDB流行趋势
"""
infos = self.tmdbchain.tmdb_trending(page=page)
infos = TmdbChain().tmdb_trending(page=page)
return [info.to_dict() for info in infos] if infos else []
@log_execution_time(logger=logger)
@@ -227,7 +223,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
"""
Bangumi每日放送
"""
medias = self.bangumichain.calendar()
medias = BangumiChain().calendar()
return [media.to_dict() for media in medias[(page - 1) * count: page * count]] if medias else []
@log_execution_time(logger=logger)
@@ -236,7 +232,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
"""
豆瓣正在热映
"""
movies = self.doubanchain.movie_showing(page=page, count=count)
movies = DoubanChain().movie_showing(page=page, count=count)
return [media.to_dict() for media in movies] if movies else []
@log_execution_time(logger=logger)
@@ -246,8 +242,8 @@ class RecommendChain(ChainBase, metaclass=Singleton):
"""
豆瓣最新电影
"""
movies = self.doubanchain.douban_discover(mtype=MediaType.MOVIE,
sort=sort, tags=tags, page=page, count=count)
movies = DoubanChain().douban_discover(mtype=MediaType.MOVIE,
sort=sort, tags=tags, page=page, count=count)
return [media.to_dict() for media in movies] if movies else []
@log_execution_time(logger=logger)
@@ -257,8 +253,8 @@ class RecommendChain(ChainBase, metaclass=Singleton):
"""
豆瓣最新电视剧
"""
tvs = self.doubanchain.douban_discover(mtype=MediaType.TV,
sort=sort, tags=tags, page=page, count=count)
tvs = DoubanChain().douban_discover(mtype=MediaType.TV,
sort=sort, tags=tags, page=page, count=count)
return [media.to_dict() for media in tvs] if tvs else []
@log_execution_time(logger=logger)
@@ -267,7 +263,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
"""
豆瓣电影TOP250
"""
movies = self.doubanchain.movie_top250(page=page, count=count)
movies = DoubanChain().movie_top250(page=page, count=count)
return [media.to_dict() for media in movies] if movies else []
@log_execution_time(logger=logger)
@@ -276,7 +272,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
"""
豆瓣国产剧集榜
"""
tvs = self.doubanchain.tv_weekly_chinese(page=page, count=count)
tvs = DoubanChain().tv_weekly_chinese(page=page, count=count)
return [media.to_dict() for media in tvs] if tvs else []
@log_execution_time(logger=logger)
@@ -285,7 +281,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
"""
豆瓣全球剧集榜
"""
tvs = self.doubanchain.tv_weekly_global(page=page, count=count)
tvs = DoubanChain().tv_weekly_global(page=page, count=count)
return [media.to_dict() for media in tvs] if tvs else []
@log_execution_time(logger=logger)
@@ -294,7 +290,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
"""
豆瓣热门动漫
"""
tvs = self.doubanchain.tv_animation(page=page, count=count)
tvs = DoubanChain().tv_animation(page=page, count=count)
return [media.to_dict() for media in tvs] if tvs else []
@log_execution_time(logger=logger)
@@ -303,7 +299,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
"""
豆瓣热门电影
"""
movies = self.doubanchain.movie_hot(page=page, count=count)
movies = DoubanChain().movie_hot(page=page, count=count)
return [media.to_dict() for media in movies] if movies else []
@log_execution_time(logger=logger)
@@ -312,5 +308,5 @@ class RecommendChain(ChainBase, metaclass=Singleton):
"""
豆瓣热门电视剧
"""
tvs = self.doubanchain.tv_hot(page=page, count=count)
tvs = DoubanChain().tv_hot(page=page, count=count)
return [media.to_dict() for media in tvs] if tvs else []

View File

@@ -27,13 +27,6 @@ class SearchChain(ChainBase):
__result_temp_file = "__search_result__"
def __init__(self):
super().__init__()
self.siteshelper = SitesHelper()
self.progress = ProgressHelper()
self.systemconfig = SystemConfigOper()
self.torrenthelper = TorrentHelper()
def search_by_id(self, tmdbid: Optional[int] = None, doubanid: Optional[str] = None,
mtype: MediaType = None, area: Optional[str] = "title", season: Optional[int] = None,
sites: List[int] = None, cache_local: bool = False) -> List[Context]:
@@ -184,19 +177,20 @@ class SearchChain(ChainBase):
return []
# 开始新进度
self.progress.start(ProgressKey.Search)
progress = ProgressHelper()
progress.start(ProgressKey.Search)
# 开始过滤
self.progress.update(value=0, text=f'开始过滤,总 {len(torrents)} 个资源,请稍候...',
key=ProgressKey.Search)
progress.update(value=0, text=f'开始过滤,总 {len(torrents)} 个资源,请稍候...',
key=ProgressKey.Search)
# 匹配订阅附加参数
if filter_params:
logger.info(f'开始附加参数过滤,附加参数:{filter_params} ...')
torrents = [torrent for torrent in torrents if self.torrenthelper.filter_torrent(torrent, filter_params)]
torrents = [torrent for torrent in torrents if TorrentHelper().filter_torrent(torrent, filter_params)]
# 开始过滤规则过滤
if rule_groups is None:
# 取搜索过滤规则
rule_groups: List[str] = self.systemconfig.get(SystemConfigKey.SearchFilterRuleGroups)
rule_groups: List[str] = SystemConfigOper().get(SystemConfigKey.SearchFilterRuleGroups)
if rule_groups:
logger.info(f'开始过滤规则/剧集过滤,使用规则组:{rule_groups} ...')
torrents = __do_filter(torrents)
@@ -206,7 +200,7 @@ class SearchChain(ChainBase):
logger.info(f"过滤规则/剧集过滤完成,剩余 {len(torrents)} 个资源")
# 过滤完成
self.progress.update(value=50, text=f'过滤完成,剩余 {len(torrents)} 个资源', key=ProgressKey.Search)
progress.update(value=50, text=f'过滤完成,剩余 {len(torrents)} 个资源', key=ProgressKey.Search)
# 开始匹配
_match_torrents = []
@@ -215,17 +209,19 @@ class SearchChain(ChainBase):
# 已处理数
_count = 0
torrenthelper = TorrentHelper()
if mediainfo:
# 英文标题应该在别名/原标题中,不需要再匹配
logger.info(f"开始匹配结果 标题:{mediainfo.title},原标题:{mediainfo.original_title},别名:{mediainfo.names}")
self.progress.update(value=51, text=f'开始匹配,总 {_total} 个资源 ...', key=ProgressKey.Search)
progress.update(value=51, text=f'开始匹配,总 {_total} 个资源 ...', key=ProgressKey.Search)
for torrent in torrents:
if global_vars.is_system_stopped:
break
_count += 1
self.progress.update(value=(_count / _total) * 96,
text=f'正在匹配 {torrent.site_name},已完成 {_count} / {_total} ...',
key=ProgressKey.Search)
progress.update(value=(_count / _total) * 96,
text=f'正在匹配 {torrent.site_name},已完成 {_count} / {_total} ...',
key=ProgressKey.Search)
if not torrent.title:
continue
@@ -236,10 +232,9 @@ class SearchChain(ChainBase):
logger.info(f"种子名称应用识别词后发生改变:{torrent.title} => {torrent_meta.org_string}")
# 季集数过滤
if season_episodes \
and not self.torrenthelper.match_season_episodes(
torrent=torrent,
meta=torrent_meta,
season_episodes=season_episodes):
and not torrenthelper.match_season_episodes(torrent=torrent,
meta=torrent_meta,
season_episodes=season_episodes):
continue
# 比对IMDBID
if torrent.imdbid \
@@ -250,17 +245,17 @@ class SearchChain(ChainBase):
continue
# 比对种子
if self.torrenthelper.match_torrent(mediainfo=mediainfo,
torrent_meta=torrent_meta,
torrent=torrent):
if torrenthelper.match_torrent(mediainfo=mediainfo,
torrent_meta=torrent_meta,
torrent=torrent):
# 匹配成功
_match_torrents.append((torrent, torrent_meta))
continue
# 匹配完成
logger.info(f"匹配完成,共匹配到 {len(_match_torrents)} 个资源")
self.progress.update(value=97,
text=f'匹配完成,共匹配到 {len(_match_torrents)} 个资源',
key=ProgressKey.Search)
progress.update(value=97,
text=f'匹配完成,共匹配到 {len(_match_torrents)} 个资源',
key=ProgressKey.Search)
else:
_match_torrents = [(t, MetaInfo(title=t.title, subtitle=t.description)) for t in torrents]
@@ -273,17 +268,17 @@ class SearchChain(ChainBase):
meta_info=t[1]) for t in _match_torrents]
# 排序
self.progress.update(value=99,
text=f'正在对 {len(contexts)} 个资源进行排序,请稍候...',
key=ProgressKey.Search)
contexts = self.torrenthelper.sort_torrents(contexts)
progress.update(value=99,
text=f'正在对 {len(contexts)} 个资源进行排序,请稍候...',
key=ProgressKey.Search)
contexts = torrenthelper.sort_torrents(contexts)
# 结束进度
logger.info(f'搜索完成,共 {len(contexts)} 个资源')
self.progress.update(value=100,
text=f'搜索完成,共 {len(contexts)} 个资源',
key=ProgressKey.Search)
self.progress.end(ProgressKey.Search)
progress.update(value=100,
text=f'搜索完成,共 {len(contexts)} 个资源',
key=ProgressKey.Search)
progress.end(ProgressKey.Search)
# 返回
return contexts
@@ -307,9 +302,9 @@ class SearchChain(ChainBase):
# 配置的索引站点
if not sites:
sites = self.systemconfig.get(SystemConfigKey.IndexerSites) or []
sites = SystemConfigOper().get(SystemConfigKey.IndexerSites) or []
for indexer in self.siteshelper.get_indexers():
for indexer in SitesHelper().get_indexers():
# 检查站点索引开关
if not sites or indexer.get("id") in sites:
indexer_sites.append(indexer)
@@ -318,7 +313,8 @@ class SearchChain(ChainBase):
return []
# 开始进度
self.progress.start(ProgressKey.Search)
progress = ProgressHelper()
progress.start(ProgressKey.Search)
# 开始计时
start_time = datetime.now()
# 总数
@@ -326,9 +322,9 @@ class SearchChain(ChainBase):
# 完成数
finish_count = 0
# 更新进度
self.progress.update(value=0,
text=f"开始搜索,共 {total_num} 个站点 ...",
key=ProgressKey.Search)
progress.update(value=0,
text=f"开始搜索,共 {total_num} 个站点 ...",
key=ProgressKey.Search)
# 结果集
results = []
# 多线程
@@ -356,18 +352,18 @@ class SearchChain(ChainBase):
if result:
results.extend(result)
logger.info(f"站点搜索进度:{finish_count} / {total_num}")
self.progress.update(value=finish_count / total_num * 100,
text=f"正在搜索{keywords or ''},已完成 {finish_count} / {total_num} 个站点 ...",
key=ProgressKey.Search)
progress.update(value=finish_count / total_num * 100,
text=f"正在搜索{keywords or ''},已完成 {finish_count} / {total_num} 个站点 ...",
key=ProgressKey.Search)
# 计算耗时
end_time = datetime.now()
# 更新进度
self.progress.update(value=100,
text=f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds}",
key=ProgressKey.Search)
progress.update(value=100,
text=f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds}",
key=ProgressKey.Search)
logger.info(f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds}")
# 结束进度
self.progress.end(ProgressKey.Search)
progress.end(ProgressKey.Search)
# 返回
return results

View File

@@ -16,7 +16,6 @@ from app.helper.browser import PlaywrightHelper
from app.helper.cloudflare import under_challenge
from app.helper.cookie import CookieHelper
from app.helper.cookiecloud import CookieCloudHelper
from app.helper.message import MessageHelper
from app.helper.rss import RssHelper
from app.helper.sites import SitesHelper
from app.log import logger
@@ -34,13 +33,6 @@ class SiteChain(ChainBase):
def __init__(self):
super().__init__()
self.siteoper = SiteOper()
self.siteshelper = SitesHelper()
self.rsshelper = RssHelper()
self.cookiehelper = CookieHelper()
self.message = MessageHelper()
self.cookiecloud = CookieCloudHelper()
self.systemconfig = SystemConfigOper()
# 特殊站点登录验证
self.special_site_test = {
@@ -62,9 +54,9 @@ class SiteChain(ChainBase):
"""
userdata: SiteUserData = self.run_module("refresh_userdata", site=site)
if userdata:
self.siteoper.update_userdata(domain=StringUtils.get_url_domain(site.get("domain")),
name=site.get("name"),
payload=userdata.dict())
SiteOper().update_userdata(domain=StringUtils.get_url_domain(site.get("domain")),
name=site.get("name"),
payload=userdata.dict())
# 发送事件
EventManager().send_event(EventType.SiteRefreshed, {
"site_id": site.get("id")
@@ -100,7 +92,7 @@ class SiteChain(ChainBase):
"""
刷新所有站点的用户数据
"""
sites = self.siteshelper.get_indexers()
sites = SitesHelper().get_indexers()
any_site_updated = False
result = {}
for site in sites:
@@ -303,21 +295,24 @@ class SiteChain(ChainBase):
return sub_domain
logger.info("开始同步CookieCloud站点 ...")
cookies, msg = self.cookiecloud.download()
cookies, msg = CookieCloudHelper().download()
if not cookies:
logger.error(f"CookieCloud同步失败{msg}")
if manual:
self.message.put(msg, title="CookieCloud同步失败", role="system")
self.messagehelper.put(msg, title="CookieCloud同步失败", role="system")
return False, msg
# 保存Cookie或新增站点
_update_count = 0
_add_count = 0
_fail_count = 0
siteshelper = SitesHelper()
siteoper = SiteOper()
rsshelper = RssHelper()
for domain, cookie in cookies.items():
# 索引器信息
indexer = self.siteshelper.get_indexer(domain)
indexer = siteshelper.get_indexer(domain)
# 数据库的站点信息
site_info = self.siteoper.get_by_domain(domain)
site_info = siteoper.get_by_domain(domain)
if site_info and site_info.is_active == 1:
# 站点已存在,检查站点连通性
status, msg = self.test(domain)
@@ -327,7 +322,7 @@ class SiteChain(ChainBase):
# 更新站点rss地址
if not site_info.public and not site_info.rss:
# 自动生成rss地址
rss_url, errmsg = self.rsshelper.get_rss_link(
rss_url, errmsg = rsshelper.get_rss_link(
url=site_info.url,
cookie=cookie,
ua=site_info.ua or settings.USER_AGENT,
@@ -335,13 +330,13 @@ class SiteChain(ChainBase):
)
if rss_url:
logger.info(f"更新站点 {domain} RSS地址 ...")
self.siteoper.update_rss(domain=domain, rss=rss_url)
siteoper.update_rss(domain=domain, rss=rss_url)
else:
logger.warn(errmsg)
continue
# 更新站点Cookie
logger.info(f"更新站点 {domain} Cookie ...")
self.siteoper.update_cookie(domain=domain, cookies=cookie)
siteoper.update_cookie(domain=domain, cookies=cookie)
_update_count += 1
elif indexer:
if settings.COOKIECLOUD_BLACKLIST and any(
@@ -396,21 +391,21 @@ class SiteChain(ChainBase):
rss_url = None
if not indexer.get("public") and domain_url:
# 自动生成rss地址
rss_url, errmsg = self.rsshelper.get_rss_link(url=domain_url,
cookie=cookie,
ua=settings.USER_AGENT,
proxy=proxy)
rss_url, errmsg = rsshelper.get_rss_link(url=domain_url,
cookie=cookie,
ua=settings.USER_AGENT,
proxy=proxy)
if errmsg:
logger.warn(errmsg)
# 插入数据库
logger.info(f"新增站点 {indexer.get('name')} ...")
self.siteoper.add(name=indexer.get("name"),
url=domain_url,
domain=domain,
cookie=cookie,
rss=rss_url,
proxy=1 if proxy else 0,
public=1 if indexer.get("public") else 0)
siteoper.add(name=indexer.get("name"),
url=domain_url,
domain=domain,
cookie=cookie,
rss=rss_url,
proxy=1 if proxy else 0,
public=1 if indexer.get("public") else 0)
_add_count += 1
# 通知站点更新
@@ -423,7 +418,7 @@ class SiteChain(ChainBase):
if _fail_count > 0:
ret_msg += f"{_fail_count}个站点添加失败,下次同步时将重试,也可以手动添加"
if manual:
self.message.put(ret_msg, title="CookieCloud同步成功", role="system")
self.messagehelper.put(ret_msg, title="CookieCloud同步成功", role="system")
logger.info(f"CookieCloud同步成功{ret_msg}")
return True, ret_msg
@@ -442,29 +437,31 @@ class SiteChain(ChainBase):
if str(domain).startswith("http"):
domain = StringUtils.get_url_domain(domain)
# 站点信息
siteinfo = self.siteoper.get_by_domain(domain)
siteoper = SiteOper()
siteshelper = SitesHelper()
siteinfo = siteoper.get_by_domain(domain)
if not siteinfo:
logger.warn(f"未维护站点 {domain} 信息!")
return
# Cookie
cookie = siteinfo.cookie
# 索引器
indexer = self.siteshelper.get_indexer(domain)
indexer = siteshelper.get_indexer(domain)
if not indexer:
logger.warn(f"站点 {domain} 索引器不存在!")
return
# 查询站点图标
site_icon = self.siteoper.get_icon_by_domain(domain)
site_icon = siteoper.get_icon_by_domain(domain)
if not site_icon or not site_icon.base64:
logger.info(f"开始缓存站点 {indexer.get('name')} 图标 ...")
icon_url, icon_base64 = self.__parse_favicon(url=indexer.get("domain"),
cookie=cookie,
ua=settings.USER_AGENT)
if icon_url:
self.siteoper.update_icon(name=indexer.get("name"),
domain=domain,
icon_url=icon_url,
icon_base64=icon_base64)
siteoper.update_icon(name=indexer.get("name"),
domain=domain,
icon_url=icon_url,
icon_base64=icon_base64)
logger.info(f"缓存站点 {indexer.get('name')} 图标成功")
else:
logger.warn(f"缓存站点 {indexer.get('name')} 图标失败")
@@ -484,11 +481,12 @@ class SiteChain(ChainBase):
# 获取主域名中间那段
domain_host = StringUtils.get_url_host(domain)
# 查询以"site.domain_host"开头的配置项,并清除
site_keys = self.systemconfig.all().keys()
systemconfig = SystemConfigOper()
site_keys = systemconfig.all().keys()
for key in site_keys:
if key.startswith(f"site.{domain_host}"):
logger.info(f"清理站点配置:{key}")
self.systemconfig.delete(key)
systemconfig.delete(key)
@eventmanager.register(EventType.SiteUpdated)
def cache_site_userdata(self, event: Event):
@@ -504,7 +502,7 @@ class SiteChain(ChainBase):
return
if str(domain).startswith("http"):
domain = StringUtils.get_url_domain(domain)
indexer = self.siteshelper.get_indexer(domain)
indexer = SitesHelper().get_indexer(domain)
if not indexer:
return
# 刷新站点用户数据
@@ -518,7 +516,8 @@ class SiteChain(ChainBase):
"""
# 检查域名是否可用
domain = StringUtils.get_url_domain(url)
site_info = self.siteoper.get_by_domain(domain)
siteoper = SiteOper()
site_info = siteoper.get_by_domain(domain)
if not site_info:
return False, f"站点【{url}】不存在"
@@ -535,9 +534,9 @@ class SiteChain(ChainBase):
# 统计
seconds = (datetime.now() - start_time).seconds
if state:
self.siteoper.success(domain=domain, seconds=seconds)
siteoper.success(domain=domain, seconds=seconds)
else:
self.siteoper.fail(domain)
siteoper.fail(domain)
return state, message
except Exception as e:
return False, f"{str(e)}"
@@ -593,7 +592,7 @@ class SiteChain(ChainBase):
"""
查询所有站点,发送消息
"""
site_list = self.siteoper.list()
site_list = SiteOper().list()
if not site_list:
self.post_message(Notification(
channel=channel,
@@ -633,7 +632,8 @@ class SiteChain(ChainBase):
if not arg_str.isdigit():
return
site_id = int(arg_str)
site = self.siteoper.get(site_id)
siteoper = SiteOper()
site = siteoper.get(site_id)
if not site:
self.post_message(Notification(
channel=channel,
@@ -641,7 +641,7 @@ class SiteChain(ChainBase):
userid=userid))
return
# 禁用站点
self.siteoper.update(site_id, {
siteoper.update(site_id, {
"is_active": False
})
# 重新发送消息
@@ -655,25 +655,27 @@ class SiteChain(ChainBase):
if not arg_str:
return
arg_strs = str(arg_str).split()
siteoper = SiteOper()
for arg_str in arg_strs:
arg_str = arg_str.strip()
if not arg_str.isdigit():
continue
site_id = int(arg_str)
site = self.siteoper.get(site_id)
site = siteoper.get(site_id)
if not site:
self.post_message(Notification(
channel=channel,
title=f"站点编号 {site_id} 不存在!", userid=userid))
return
# 禁用站点
self.siteoper.update(site_id, {
siteoper.update(site_id, {
"is_active": True
})
# 重新发送消息
self.remote_list(channel=channel, userid=userid, source=source)
def update_cookie(self, site_info: Site,
@staticmethod
def update_cookie(site_info: Site,
username: str, password: str, two_step_code: Optional[str] = None) -> Tuple[bool, str]:
"""
根据用户名密码更新站点Cookie
@@ -684,7 +686,7 @@ class SiteChain(ChainBase):
:return: (是否成功, 错误信息)
"""
# 更新站点Cookie
result = self.cookiehelper.get_site_cookie_ua(
result = CookieHelper().get_site_cookie_ua(
url=site_info.url,
username=username,
password=password,
@@ -695,7 +697,7 @@ class SiteChain(ChainBase):
cookie, ua, msg = result
if not cookie:
return False, msg
self.siteoper.update(site_info.id, {
SiteOper().update(site_info.id, {
"cookie": cookie,
"ua": ua
})
@@ -737,7 +739,7 @@ class SiteChain(ChainBase):
# 站点ID
site_id = int(site_id)
# 站点信息
site_info = self.siteoper.get(site_id)
site_info = SiteOper().get(site_id)
if not site_info:
self.post_message(Notification(
channel=channel,

View File

@@ -14,10 +14,6 @@ class StorageChain(ChainBase):
存储处理链
"""
def __init__(self):
super().__init__()
self.directoryhelper = DirectoryHelper()
def save_config(self, storage: str, conf: dict) -> None:
"""
保存存储配置
@@ -192,7 +188,7 @@ class StorageChain(ChainBase):
# 检查和删除上级目录
if dir_item and len(Path(dir_item.path).parts) > 2:
# 如何目录是所有下载目录、媒体库目录的上级,则不处理
for d in self.directoryhelper.get_dirs():
for d in DirectoryHelper().get_dirs():
if d.download_path and Path(d.download_path).is_relative_to(Path(dir_item.path)):
logger.debug(f"{dir_item.storage}{dir_item.path} 是下载目录本级或上级目录,不删除")
return True

View File

@@ -24,36 +24,20 @@ from app.db.models.subscribe import Subscribe
from app.db.site_oper import SiteOper
from app.db.subscribe_oper import SubscribeOper
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.message import MessageHelper
from app.helper.subscribe import SubscribeHelper
from app.helper.torrent import TorrentHelper
from app.log import logger
from app.schemas import MediaRecognizeConvertEventData
from app.schemas.types import MediaType, SystemConfigKey, MessageChannel, NotificationType, EventType, ChainEventType, \
ContentType
from app.utils.singleton import Singleton
class SubscribeChain(ChainBase, metaclass=Singleton):
class SubscribeChain(ChainBase):
"""
订阅管理处理链
"""
def __init__(self):
super().__init__()
self._rlock = threading.RLock()
self.downloadchain = DownloadChain()
self.downloadhis = DownloadHistoryOper()
self.searchchain = SearchChain()
self.subscribeoper = SubscribeOper()
self.subscribehelper = SubscribeHelper()
self.torrentschain = TorrentsChain()
self.mediachain = MediaChain()
self.tmdbchain = TmdbChain()
self.message = MessageHelper()
self.systemconfig = SystemConfigOper()
self.torrenthelper = TorrentHelper()
self.siteoper = SiteOper()
_rlock = threading.RLock()
def add(self, title: str, year: str,
mtype: MediaType = None,
@@ -87,11 +71,12 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
if event and event.event_data:
event_data: MediaRecognizeConvertEventData = event.event_data
if event_data.media_dict:
mediachain = MediaChain()
new_id = event_data.media_dict.get("id")
if event_data.convert_type == "themoviedb":
return self.mediachain.recognize_media(meta=_meta, tmdbid=new_id)
return mediachain.recognize_media(meta=_meta, tmdbid=new_id)
elif event_data.convert_type == "douban":
return self.mediachain.recognize_media(meta=_meta, doubanid=new_id)
return mediachain.recognize_media(meta=_meta, doubanid=new_id)
return None
logger.info(f'开始添加订阅,标题:{title} ...')
@@ -111,7 +96,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
if not tmdbid:
if doubanid:
# 将豆瓣信息转换为TMDB信息
tmdbinfo = self.mediachain.get_tmdbinfo_by_doubanid(doubanid=doubanid, mtype=mtype)
tmdbinfo = MediaChain().get_tmdbinfo_by_doubanid(doubanid=doubanid, mtype=mtype)
if tmdbinfo:
mediainfo = MediaInfo(tmdb_info=tmdbinfo)
elif mediaid:
@@ -214,7 +199,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
"filter_groups") else kwargs.get("filter_groups")
})
# 操作数据库
sid, err_msg = self.subscribeoper.add(mediainfo=mediainfo, season=season, username=username, **kwargs)
sid, err_msg = SubscribeOper().add(mediainfo=mediainfo, season=season, username=username, **kwargs)
if not sid:
logger.error(f'{mediainfo.title_year} {err_msg}')
if not exist_ok and message:
@@ -253,7 +238,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
"mediainfo": mediainfo.to_dict(),
})
# 统计订阅
self.subscribehelper.sub_reg_async({
SubscribeHelper().sub_reg_async({
"name": title,
"year": year,
"type": metainfo.type.value,
@@ -271,13 +256,14 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
# 返回结果
return sid, ""
def exists(self, mediainfo: MediaInfo, meta: MetaBase = None):
@staticmethod
def exists(mediainfo: MediaInfo, meta: MetaBase = None):
"""
判断订阅是否已存在
"""
if self.subscribeoper.exists(tmdbid=mediainfo.tmdb_id,
doubanid=mediainfo.douban_id,
season=meta.begin_season if meta else None):
if SubscribeOper().exists(tmdbid=mediainfo.tmdb_id,
doubanid=mediainfo.douban_id,
season=meta.begin_season if meta else None):
return True
return False
@@ -291,11 +277,12 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
"""
with self._rlock:
logger.debug(f"search lock acquired at {datetime.now()}")
subscribeoper = SubscribeOper()
if sid:
subscribe = self.subscribeoper.get(sid)
subscribe = subscribeoper.get(sid)
subscribes = [subscribe] if subscribe else []
else:
subscribes = self.subscribeoper.list(self.get_states_for_search(state))
subscribes = subscribeoper.list(self.get_states_for_search(state))
# 遍历订阅
for subscribe in subscribes:
if global_vars.is_system_stopped:
@@ -350,20 +337,20 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
# 优先级过滤规则
if subscribe.best_version:
rule_groups = subscribe.filter_groups \
or self.systemconfig.get(SystemConfigKey.BestVersionFilterRuleGroups) or []
or SystemConfigOper().get(SystemConfigKey.BestVersionFilterRuleGroups) or []
else:
rule_groups = subscribe.filter_groups \
or self.systemconfig.get(SystemConfigKey.SubscribeFilterRuleGroups) or []
or SystemConfigOper().get(SystemConfigKey.SubscribeFilterRuleGroups) or []
# 搜索,同时电视剧会过滤掉不需要的剧集
contexts = self.searchchain.process(mediainfo=mediainfo,
keyword=subscribe.keyword,
no_exists=no_exists,
sites=sites,
rule_groups=rule_groups,
area="imdbid" if subscribe.search_imdbid else "title",
custom_words=custom_word_list,
filter_params=self.get_params(subscribe))
contexts = SearchChain().process(mediainfo=mediainfo,
keyword=subscribe.keyword,
no_exists=no_exists,
sites=sites,
rule_groups=rule_groups,
area="imdbid" if subscribe.search_imdbid else "title",
custom_words=custom_word_list,
filter_params=self.get_params(subscribe))
if not contexts:
logger.warn(f'订阅 {subscribe.keyword or subscribe.name} 未搜索到资源')
self.finish_subscribe_or_not(subscribe=subscribe, meta=meta,
@@ -404,7 +391,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
continue
# 自动下载
downloads, lefts = self.downloadchain.batch_download(
downloads, lefts = DownloadChain().batch_download(
contexts=matched_contexts,
no_exists=no_exists,
userid=subscribe.username,
@@ -415,7 +402,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
)
# 同步外部修改,更新订阅信息
subscribe = self.subscribeoper.get(subscribe.id)
subscribe = subscribeoper.get(subscribe.id)
# 判断是否应完成订阅
if subscribe:
@@ -424,17 +411,17 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
finally:
# 如果状态为N则更新为R
if subscribe and subscribe.state == 'N':
self.subscribeoper.update(subscribe.id, {'state': 'R'})
subscribeoper.update(subscribe.id, {'state': 'R'})
# 手动触发时发送系统消息
if manual:
if subscribes:
if sid:
self.message.put(f'{subscribes[0].name} 搜索完成!', title="订阅搜索", role="system")
self.messagehelper.put(f'{subscribes[0].name} 搜索完成!', title="订阅搜索", role="system")
else:
self.message.put('所有订阅搜索完成!', title="订阅搜索", role="system")
self.messagehelper.put('所有订阅搜索完成!', title="订阅搜索", role="system")
else:
self.message.put('没有找到订阅!', title="订阅搜索", role="system")
self.messagehelper.put('没有找到订阅!', title="订阅搜索", role="system")
logger.debug(f"search Lock released at {datetime.now()}")
def update_subscribe_priority(self, subscribe: Subscribe, meta: MetaBase,
@@ -449,7 +436,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
# 当前下载资源的优先级
priority = max([item.torrent_info.pri_order for item in downloads])
# 订阅存在待定策略,不管是否已完成,均需更新订阅信息
self.subscribeoper.update(subscribe.id, {
SubscribeOper().update(subscribe.id, {
"current_priority": priority,
"last_update": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
@@ -506,17 +493,18 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
if sites is None:
return
self.match(
self.torrentschain.refresh(sites=sites)
TorrentsChain().refresh(sites=sites)
)
def get_sub_sites(self, subscribe: Subscribe) -> List[int]:
@staticmethod
def get_sub_sites(subscribe: Subscribe) -> List[int]:
"""
获取订阅中涉及的站点清单
:param subscribe: 订阅信息对象
:return: 涉及的站点清单
"""
# 从系统配置获取默认订阅站点
default_sites = self.systemconfig.get(SystemConfigKey.RssSites) or []
default_sites = SystemConfigOper().get(SystemConfigKey.RssSites) or []
# 如果订阅未指定站点,直接返回默认站点
if not subscribe.sites:
return default_sites
@@ -536,7 +524,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
:return: 返回[]代表所有站点命中返回None代表没有订阅
"""
# 查询所有订阅
subscribes = self.subscribeoper.list(self.get_states_for_search('R'))
subscribes = SubscribeOper().list(self.get_states_for_search('R'))
if not subscribes:
return None
ret_sites = []
@@ -561,7 +549,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
with self._rlock:
logger.debug(f"match lock acquired at {datetime.now()}")
# 所有订阅
subscribes = self.subscribeoper.list(self.get_states_for_search('R'))
subscribes = SubscribeOper().list(self.get_states_for_search('R'))
# 预识别所有未识别的种子
processed_torrents: Dict[str, List[Context]] = {}
@@ -598,7 +586,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
# 订阅的站点域名列表
domains = []
if subscribe.sites:
domains = self.siteoper.get_domains_by_ids(subscribe.sites)
domains = SiteOper().get_domains_by_ids(subscribe.sites)
# 识别媒体信息
mediainfo: MediaInfo = self.recognize_media(meta=meta, mtype=meta.type,
tmdbid=subscribe.tmdbid,
@@ -628,6 +616,9 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
# 遍历预识别后的种子
_match_context = []
torrenthelper = TorrentHelper()
systemconfig = SystemConfigOper()
wordsmatcher = WordsMatcher()
for domain, contexts in processed_torrents.items():
if global_vars.is_system_stopped:
break
@@ -650,8 +641,8 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
# 有自定义识别词时,需要判断是否需要重新识别
if custom_words_list:
# 使用org_string应用一次后理论上不能再次应用
_, apply_words = WordsMatcher().prepare(torrent_meta.org_string,
custom_words=custom_words_list)
_, apply_words = wordsmatcher.prepare(torrent_meta.org_string,
custom_words=custom_words_list)
if apply_words:
logger.info(
f'{torrent_info.site_name} - {torrent_info.title} 因订阅存在自定义识别词,重新识别元数据...')
@@ -673,9 +664,9 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
if not torrent_mediainfo or (not torrent_mediainfo.tmdb_id and not torrent_mediainfo.douban_id):
logger.info(
f'{torrent_info.site_name} - {torrent_info.title} 重新识别失败,尝试通过标题匹配...')
if self.torrenthelper.match_torrent(mediainfo=mediainfo,
torrent_meta=torrent_meta,
torrent=torrent_info):
if torrenthelper.match_torrent(mediainfo=mediainfo,
torrent_meta=torrent_meta,
torrent=torrent_info):
# 匹配成功
logger.info(
f'{mediainfo.title_year} 通过标题匹配到可选资源:{torrent_info.site_name} - {torrent_info.title}')
@@ -739,17 +730,17 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
continue
# 匹配订阅附加参数
if not self.torrenthelper.filter_torrent(torrent_info=torrent_info,
filter_params=self.get_params(subscribe)):
if not torrenthelper.filter_torrent(torrent_info=torrent_info,
filter_params=self.get_params(subscribe)):
continue
# 优先级过滤规则
if subscribe.best_version:
rule_groups = subscribe.filter_groups \
or self.systemconfig.get(SystemConfigKey.BestVersionFilterRuleGroups)
or systemconfig.get(SystemConfigKey.BestVersionFilterRuleGroups)
else:
rule_groups = subscribe.filter_groups \
or self.systemconfig.get(SystemConfigKey.SubscribeFilterRuleGroups)
or systemconfig.get(SystemConfigKey.SubscribeFilterRuleGroups)
result: List[TorrentInfo] = self.filter_torrents(
rule_groups=rule_groups,
torrent_list=[torrent_info],
@@ -789,33 +780,22 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
# 开始批量择优下载
logger.info(f'{mediainfo.title_year} 匹配完成,共匹配到{len(_match_context)}个资源')
downloads, lefts = self.downloadchain.batch_download(contexts=_match_context,
no_exists=no_exists,
userid=subscribe.username,
username=subscribe.username,
save_path=subscribe.save_path,
downloader=subscribe.downloader,
source=self.get_subscribe_source_keyword(subscribe)
)
downloads, lefts = DownloadChain().batch_download(contexts=_match_context,
no_exists=no_exists,
userid=subscribe.username,
username=subscribe.username,
save_path=subscribe.save_path,
downloader=subscribe.downloader,
source=self.get_subscribe_source_keyword(subscribe)
)
# 同步外部修改,更新订阅信息
subscribe = self.subscribeoper.get(subscribe.id)
subscribe = SubscribeOper().get(subscribe.id)
# 判断是否要完成订阅
if subscribe:
self.finish_subscribe_or_not(subscribe=subscribe, meta=meta, mediainfo=mediainfo,
downloads=downloads, lefts=lefts)
# 清理内存
_match_context.clear()
del _match_context
downloads.clear()
del downloads
lefts.clear()
del lefts
# 清理内存
processed_torrents.clear()
del processed_torrents
logger.debug(f"match Lock released at {datetime.now()}")
@@ -824,7 +804,8 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
定时检查订阅,更新订阅信息
"""
# 查询所有订阅
subscribes = self.subscribeoper.list()
subscribeoper = SubscribeOper()
subscribes = subscribeoper.list()
if not subscribes:
# 没有订阅不运行
return
@@ -863,7 +844,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
total_episode = subscribe.total_episode
lack_episode = subscribe.lack_episode
# 更新TMDB信息
self.subscribeoper.update(subscribe.id, {
subscribeoper.update(subscribe.id, {
"name": mediainfo.title,
"year": mediainfo.year,
"vote": mediainfo.vote_average,
@@ -877,28 +858,30 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
})
logger.info(f'{subscribe.name} 订阅元数据更新完成')
def follow(self):
@staticmethod
def follow():
"""
刷新follow的用户分享并自动添加订阅
"""
follow_users: List[str] = self.systemconfig.get(SystemConfigKey.FollowSubscribers)
follow_users: List[str] = SystemConfigOper().get(SystemConfigKey.FollowSubscribers)
if not follow_users:
return
share_subs = self.subscribehelper.get_shares()
share_subs = SubscribeHelper().get_shares()
logger.info(f'开始刷新follow用户分享订阅 ...')
success_count = 0
subscribeoper = SubscribeOper()
for share_sub in share_subs:
uid = share_sub.get("share_uid")
if uid and uid in follow_users:
# 订阅已存在则跳过
if self.subscribeoper.exists(tmdbid=share_sub.get("tmdbid"),
doubanid=share_sub.get("doubanid"),
season=share_sub.get("season")):
if subscribeoper.exists(tmdbid=share_sub.get("tmdbid"),
doubanid=share_sub.get("doubanid"),
season=share_sub.get("season")):
continue
# 已经订阅过跳过
if self.subscribeoper.exist_history(tmdbid=share_sub.get("tmdbid"),
doubanid=share_sub.get("doubanid"),
season=share_sub.get("season")):
if subscribeoper.exist_history(tmdbid=share_sub.get("tmdbid"),
doubanid=share_sub.get("doubanid"),
season=share_sub.get("season")):
continue
# 去除无效属性
for key in list(share_sub.keys()):
@@ -939,7 +922,8 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
logger.error(f'follow用户分享订阅 {title} 添加失败:{message}')
logger.info(f'follow用户分享订阅刷新完成共添加 {success_count} 个订阅')
def __update_subscribe_note(self, subscribe: Subscribe, downloads: Optional[List[Context]]):
@staticmethod
def __update_subscribe_note(subscribe: Subscribe, downloads: Optional[List[Context]]):
"""
更新已下载信息到note字段
"""
@@ -971,7 +955,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
note = list(set(note).union(set(items)))
# 更新订阅
if note:
self.subscribeoper.update(subscribe.id, {
SubscribeOper().update(subscribe.id, {
"note": note
})
@@ -995,7 +979,8 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
return note
return []
def __update_lack_episodes(self, lefts: Dict[Union[int, str], Dict[int, schemas.NotExistMediaInfo]],
@staticmethod
def __update_lack_episodes(lefts: Dict[Union[int, str], Dict[int, schemas.NotExistMediaInfo]],
subscribe: Subscribe,
mediainfo: MediaInfo,
update_date: Optional[bool] = False):
@@ -1028,7 +1013,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
update_data["lack_episode"] = lack_episode
# 更新数据库
if update_data:
self.subscribeoper.update(subscribe.id, update_data)
SubscribeOper().update(subscribe.id, update_data)
def __finish_subscribe(self, subscribe: Subscribe, mediainfo: MediaInfo, meta: MetaBase):
"""
@@ -1041,9 +1026,10 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
msgstr = "订阅" if not subscribe.best_version else "洗版"
logger.info(f'{mediainfo.title_year} 完成{msgstr}')
# 新增订阅历史
self.subscribeoper.add_history(**subscribe.to_dict())
subscribeoper = SubscribeOper()
subscribeoper.add_history(**subscribe.to_dict())
# 删除订阅
self.subscribeoper.delete(subscribe.id)
subscribeoper.delete(subscribe.id)
# 发送通知
if mediainfo.type == MediaType.TV:
link = settings.MP_DOMAIN('#/subscribe/tv?tab=mysub')
@@ -1070,7 +1056,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
"mediainfo": mediainfo.to_dict(),
})
# 统计订阅
self.subscribehelper.sub_done_async({
SubscribeHelper().sub_done_async({
"tmdbid": mediainfo.tmdb_id,
"doubanid": mediainfo.douban_id
})
@@ -1080,7 +1066,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
"""
查询订阅并发送消息
"""
subscribes = self.subscribeoper.list()
subscribes = SubscribeOper().list()
if not subscribes:
self.post_message(schemas.Notification(channel=channel,
source=source,
@@ -1114,20 +1100,22 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
"[id]为订阅编号", userid=userid))
return
arg_strs = str(arg_str).split()
subscribeoper = SubscribeOper()
subscribehelper = SubscribeHelper()
for arg_str in arg_strs:
arg_str = arg_str.strip()
if not arg_str.isdigit():
continue
subscribe_id = int(arg_str)
subscribe = self.subscribeoper.get(subscribe_id)
subscribe = subscribeoper.get(subscribe_id)
if not subscribe:
self.post_message(schemas.Notification(channel=channel, source=source,
title=f"订阅编号 {subscribe_id} 不存在!", userid=userid))
return
# 删除订阅
self.subscribeoper.delete(subscribe_id)
subscribeoper.delete(subscribe_id)
# 统计订阅
self.subscribehelper.sub_done_async({
subscribehelper.sub_done_async({
"tmdbid": subscribe.tmdbid,
"doubanid": subscribe.doubanid
})
@@ -1253,13 +1241,14 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
site_id = event_data.get("site_id")
if not site_id:
return
subscribeoper = SubscribeOper()
if site_id == "*":
# 站点被重置
SystemConfigOper().set(SystemConfigKey.RssSites, [])
for subscribe in self.subscribeoper.list():
for subscribe in subscribeoper.list():
if not subscribe.sites:
continue
self.subscribeoper.update(subscribe.id, {
subscribeoper.update(subscribe.id, {
"sites": []
})
return
@@ -1269,14 +1258,14 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
selected_sites.remove(site_id)
SystemConfigOper().set(SystemConfigKey.RssSites, selected_sites)
# 查询所有订阅
for subscribe in self.subscribeoper.list():
for subscribe in subscribeoper.list():
if not subscribe.sites:
continue
sites = subscribe.sites or []
if site_id not in sites:
continue
sites.remove(site_id)
self.subscribeoper.update(subscribe.id, {
subscribeoper.update(subscribe.id, {
"sites": sites
})
@@ -1301,12 +1290,13 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
return None
return value.get(default_config_key) or None
def get_params(self, subscribe: Subscribe):
@staticmethod
def get_params(subscribe: Subscribe):
"""
获取订阅默认参数
"""
# 默认过滤规则
default_rule = self.systemconfig.get(SystemConfigKey.SubscribeDefaultParams) or {}
default_rule = SystemConfigOper().get(SystemConfigKey.SubscribeDefaultParams) or {}
return {
key: value for key, value in {
"include": subscribe.include or default_rule.get("include"),
@@ -1334,7 +1324,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
episodes: Dict[int, schemas.SubscribeEpisodeInfo] = {}
if subscribe.tmdbid and subscribe.type == MediaType.TV.value:
# 查询TMDB中的集信息
tmdb_episodes = self.tmdbchain.tmdb_episodes(
tmdb_episodes = TmdbChain().tmdb_episodes(
tmdbid=subscribe.tmdbid,
season=subscribe.season,
episode_group=subscribe.episode_group
@@ -1359,11 +1349,12 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
episodes[0] = info
# 所有下载记录
download_his = self.downloadhis.get_by_mediaid(tmdbid=subscribe.tmdbid, doubanid=subscribe.doubanid)
downloadhis = DownloadHistoryOper()
download_his = downloadhis.get_by_mediaid(tmdbid=subscribe.tmdbid, doubanid=subscribe.doubanid)
if download_his:
for his in download_his:
# 查询下载文件
files = self.downloadhis.get_files_by_hash(his.download_hash)
files = downloadhis.get_files_by_hash(his.download_hash)
if files:
for file in files:
# 识别文件名
@@ -1457,7 +1448,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
subscribe.season: subscribe.total_episode
}
# 查询媒体库缺失的媒体信息
exist_flag, no_exists = self.downloadchain.get_no_exists_info(
exist_flag, no_exists = DownloadChain().get_no_exists_info(
meta=meta,
mediainfo=mediainfo,
totals=totals

View File

@@ -8,24 +8,18 @@ from app.core.config import settings
from app.log import logger
from app.schemas import Notification, MessageChannel
from app.utils.http import RequestUtils
from app.utils.singleton import Singleton
from app.utils.system import SystemUtils
from helper.system import SystemHelper
from app.helper.system import SystemHelper
from version import FRONTEND_VERSION, APP_VERSION
class SystemChain(ChainBase, metaclass=Singleton):
class SystemChain(ChainBase):
"""
系统级处理链
"""
_restart_file = "__system_restart__"
def __init__(self):
super().__init__()
# 重启完成检测
self.restart_finish()
def remote_clear_cache(self, channel: MessageChannel, userid: Union[int, str], source: Optional[str] = None):
"""
清理系统缓存

View File

@@ -3,13 +3,11 @@ from typing import Optional, List
from app import schemas
from app.chain import ChainBase
from app.core.cache import cached
from app.core.context import MediaInfo
from app.schemas import MediaType
from app.utils.singleton import Singleton
class TmdbChain(ChainBase, metaclass=Singleton):
class TmdbChain(ChainBase):
"""
TheMovieDB处理链单例运行
"""
@@ -145,7 +143,6 @@ class TmdbChain(ChainBase, metaclass=Singleton):
"""
return self.run_module("tmdb_person_credits", person_id=person_id, page=page)
@cached(maxsize=1, ttl=3600)
def get_random_wallpager(self) -> Optional[str]:
"""
获取随机壁纸缓存1个小时
@@ -159,7 +156,6 @@ class TmdbChain(ChainBase, metaclass=Singleton):
return info.backdrop_path
return None
@cached(maxsize=1, ttl=3600)
def get_trending_wallpapers(self, num: Optional[int] = 10) -> List[str]:
"""
获取所有流行壁纸

View File

@@ -17,11 +17,10 @@ from app.helper.torrent import TorrentHelper
from app.log import logger
from app.schemas import Notification
from app.schemas.types import SystemConfigKey, MessageChannel, NotificationType, MediaType
from app.utils.singleton import Singleton
from app.utils.string import StringUtils
class TorrentsChain(ChainBase, metaclass=Singleton):
class TorrentsChain(ChainBase):
"""
站点首页或RSS种子处理链服务于订阅、刷流等
"""
@@ -29,22 +28,6 @@ class TorrentsChain(ChainBase, metaclass=Singleton):
_spider_file = "__torrents_cache__"
_rss_file = "__rss_cache__"
def __init__(self):
super().__init__()
self.siteshelper = SitesHelper()
self.siteoper = SiteOper()
self.rsshelper = RssHelper()
self.systemconfig = SystemConfigOper()
self.mediachain = MediaChain()
self.torrenthelper = TorrentHelper()
def __del__(self):
"""
析构函数,停止内存监控
"""
if hasattr(self, 'memory_manager'):
self.memory_manager.stop_monitoring()
@property
def cache_file(self) -> str:
"""
@@ -99,7 +82,7 @@ class TorrentsChain(ChainBase, metaclass=Singleton):
:param page: 页码
"""
logger.info(f'开始获取站点 {domain} 最新种子 ...')
site = self.siteshelper.get_indexer(domain)
site = SitesHelper().get_indexer(domain)
if not site:
logger.error(f'站点 {domain} 不存在!')
return []
@@ -112,15 +95,15 @@ class TorrentsChain(ChainBase, metaclass=Singleton):
:param domain: 站点域名
"""
logger.info(f'开始获取站点 {domain} RSS ...')
site = self.siteshelper.get_indexer(domain)
site = SitesHelper().get_indexer(domain)
if not site:
logger.error(f'站点 {domain} 不存在!')
return []
if not site.get("rss"):
logger.error(f'站点 {domain} 未配置RSS地址')
return []
rss_items = self.rsshelper.parse(site.get("rss"), True if site.get("proxy") else False,
timeout=int(site.get("timeout") or 30))
rss_items = RssHelper().parse(site.get("rss"), True if site.get("proxy") else False,
timeout=int(site.get("timeout") or 30))
if rss_items is None:
# rss过期尝试保留原配置生成新的rss
self.__renew_rss_url(domain=domain, site=site)
@@ -164,7 +147,7 @@ class TorrentsChain(ChainBase, metaclass=Singleton):
# 刷新站点
if not sites:
sites = self.systemconfig.get(SystemConfigKey.RssSites) or []
sites = SystemConfigOper().get(SystemConfigKey.RssSites) or []
# 读取缓存
torrents_cache = self.get_torrents()
@@ -172,10 +155,10 @@ class TorrentsChain(ChainBase, metaclass=Singleton):
# 缓存过滤掉无效种子
for _domain, _torrents in torrents_cache.items():
torrents_cache[_domain] = [_torrent for _torrent in _torrents
if not self.torrenthelper.is_invalid(_torrent.torrent_info.enclosure)]
if not TorrentHelper().is_invalid(_torrent.torrent_info.enclosure)]
# 所有站点索引
indexers = self.siteshelper.get_indexers()
indexers = SitesHelper().get_indexers()
# 需要刷新的站点domain
domains = []
@@ -222,7 +205,7 @@ class TorrentsChain(ChainBase, metaclass=Singleton):
and torrent.category == MediaType.TV.value:
meta.type = MediaType.TV
# 识别媒体信息
mediainfo: MediaInfo = self.mediachain.recognize_by_meta(meta)
mediainfo: MediaInfo = MediaChain().recognize_by_meta(meta)
if not mediainfo:
logger.warn(f'{torrent.title} 未识别到媒体信息')
# 存储空的媒体信息
@@ -282,7 +265,7 @@ class TorrentsChain(ChainBase, metaclass=Singleton):
# RSS链接过期
logger.error(f"站点 {domain} RSS链接已过期正在尝试自动获取")
# 自动生成rss地址
rss_url, errmsg = self.rsshelper.get_rss_link(
rss_url, errmsg = RssHelper().get_rss_link(
url=site.get("url"),
cookie=site.get("cookie"),
ua=site.get("ua") or settings.USER_AGENT,
@@ -296,7 +279,7 @@ class TorrentsChain(ChainBase, metaclass=Singleton):
# 获取过期rss除去passkey部分
new_rss = re.sub(r'&passkey=([a-zA-Z0-9]+)', f'&passkey={new_passkey}', site.get("rss"))
logger.info(f"更新站点 {domain} RSS地址 ...")
self.siteoper.update_rss(domain=domain, rss=new_rss)
SiteOper().update_rss(domain=domain, rss=new_rss)
else:
# 发送消息
self.post_message(

View File

@@ -328,7 +328,8 @@ class JobManager:
# 计算状态为完成的任务数
if __mediaid__ not in self._job_view:
return 0
return sum([task.fileitem.size for task in self._job_view[__mediaid__].tasks if task.state == "completed" and task.fileitem.size is not None])
return sum([task.fileitem.size for task in self._job_view[__mediaid__].tasks if
task.state == "completed" and task.fileitem.size is not None])
def total(self) -> int:
"""
@@ -371,14 +372,6 @@ class TransferChain(ChainBase, metaclass=Singleton):
def __init__(self):
super().__init__()
self.downloadhis = DownloadHistoryOper()
self.transferhis = TransferHistoryOper()
self.progress = ProgressHelper()
self.mediachain = MediaChain()
self.tmdbchain = TmdbChain()
self.storagechain = StorageChain()
self.systemconfig = SystemConfigOper()
self.directoryhelper = DirectoryHelper()
self.jobview = JobManager()
# 启动整理任务
@@ -397,11 +390,12 @@ class TransferChain(ChainBase, metaclass=Singleton):
"""
整理完成后处理
"""
transferhis = TransferHistoryOper()
if not transferinfo.success:
# 转移失败
logger.warn(f"{task.fileitem.name} 入库失败:{transferinfo.message}")
# 新增转移失败历史记录
self.transferhis.add_fail(
transferhis.add_fail(
fileitem=task.fileitem,
mode=transferinfo.transfer_type if transferinfo else '',
downloader=task.downloader,
@@ -428,7 +422,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
logger.info(f"{task.fileitem.name} 入库成功:{transferinfo.target_diritem.path}")
# 新增转移成功历史记录
self.transferhis.add_success(
transferhis.add_success(
fileitem=task.fileitem,
mode=transferinfo.transfer_type if transferinfo else '',
downloader=task.downloader,
@@ -457,6 +451,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
tasks = self.jobview.success_tasks(task.mediainfo, task.meta.begin_season)
# 记录已处理的种子hash
processed_hashes = set()
storagechain = StorageChain()
for t in tasks:
# 下载器hash
if t.download_hash and t.download_hash not in processed_hashes:
@@ -465,7 +460,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
logger.info(f"移动模式删除种子成功:{t.download_hash} ")
# 删除残留目录
if t.fileitem:
self.storagechain.delete_media_file(t.fileitem, delete_self=False)
storagechain.delete_media_file(t.fileitem, delete_self=False)
# 整理完成且有成功的任务时
if self.jobview.is_finished(task):
# 发送通知,实时手动整理时不发
@@ -543,6 +538,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 失败数量
fail_num = 0
progress = ProgressHelper()
while not global_vars.is_system_stopped:
try:
item: TransferQueue = self._queue.get(block=False)
@@ -556,24 +553,24 @@ class TransferChain(ChainBase, metaclass=Singleton):
if __queue_start:
logger.info("开始整理队列处理...")
# 启动进度
self.progress.start(ProgressKey.FileTransfer)
progress.start(ProgressKey.FileTransfer)
# 重置计数
processed_num = 0
fail_num = 0
total_num = self.jobview.total()
__process_msg = f"开始整理队列处理,当前共 {total_num} 个文件 ..."
logger.info(__process_msg)
self.progress.update(value=0,
text=__process_msg,
key=ProgressKey.FileTransfer)
progress.update(value=0,
text=__process_msg,
key=ProgressKey.FileTransfer)
# 队列已开始
__queue_start = False
# 更新进度
__process_msg = f"正在整理 {fileitem.name} ..."
logger.info(__process_msg)
self.progress.update(value=processed_num / total_num * 100,
text=__process_msg,
key=ProgressKey.FileTransfer)
progress.update(value=processed_num / total_num * 100,
text=__process_msg,
key=ProgressKey.FileTransfer)
# 整理
state, err_msg = self.__handle_transfer(task=task, callback=item.callback)
if not state:
@@ -583,18 +580,18 @@ class TransferChain(ChainBase, metaclass=Singleton):
processed_num += 1
__process_msg = f"{fileitem.name} 整理完成"
logger.info(__process_msg)
self.progress.update(value=processed_num / total_num * 100,
text=__process_msg,
key=ProgressKey.FileTransfer)
progress.update(value=processed_num / total_num * 100,
text=__process_msg,
key=ProgressKey.FileTransfer)
except queue.Empty:
if not __queue_start:
# 结束进度
__end_msg = f"整理队列处理完成,共整理 {processed_num} 个文件,失败 {fail_num}"
logger.info(__end_msg)
self.progress.update(value=100,
text=__end_msg,
key=ProgressKey.FileTransfer)
self.progress.end(ProgressKey.FileTransfer)
progress.update(value=100,
text=__end_msg,
key=ProgressKey.FileTransfer)
progress.end(ProgressKey.FileTransfer)
# 重置计数
processed_num = 0
fail_num = 0
@@ -614,6 +611,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
"""
try:
# 识别
transferhis = TransferHistoryOper()
if not task.mediainfo:
mediainfo = None
download_history = task.download_history
@@ -633,7 +631,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
mediainfo.category = download_history.media_category
else:
# 识别媒体信息
mediainfo = self.mediachain.recognize_by_meta(task.meta)
mediainfo = MediaChain().recognize_by_meta(task.meta)
# 更新媒体图片
if mediainfo:
@@ -641,7 +639,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
if not mediainfo:
# 新增整理失败历史记录
his = self.transferhis.add_fail(
his = transferhis.add_fail(
fileitem=task.fileitem,
mode=task.transfer_type,
meta=task.meta,
@@ -661,8 +659,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 如果未开启新增已入库媒体是否跟随TMDB信息变化则根据tmdbid查询之前的title
if not settings.SCRAP_FOLLOW_TMDB:
transfer_history = self.transferhis.get_by_type_tmdbid(tmdbid=mediainfo.tmdb_id,
mtype=mediainfo.type.value)
transfer_history = transferhis.get_by_type_tmdbid(tmdbid=mediainfo.tmdb_id,
mtype=mediainfo.type.value)
if transfer_history:
mediainfo.title = transfer_history.title
@@ -682,7 +680,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 默认值1
if season_num is None:
season_num = 1
task.episodes_info = self.tmdbchain.tmdb_episodes(
task.episodes_info = TmdbChain().tmdb_episodes(
tmdbid=task.mediainfo.tmdb_id,
season=season_num,
episode_group=task.mediainfo.episode_group
@@ -692,15 +690,15 @@ class TransferChain(ChainBase, metaclass=Singleton):
if not task.target_directory:
if task.target_path:
# 指定目标路径,`手动整理`场景下使用,忽略源目录匹配,使用指定目录匹配
task.target_directory = self.directoryhelper.get_dir(media=task.mediainfo,
dest_path=task.target_path,
target_storage=task.target_storage)
task.target_directory = DirectoryHelper().get_dir(media=task.mediainfo,
dest_path=task.target_path,
target_storage=task.target_storage)
else:
# 启用源目录匹配时,根据源目录匹配下载目录,否则按源目录同盘优先原则,如无源目录,则根据媒体信息获取目标目录
task.target_directory = self.directoryhelper.get_dir(media=task.mediainfo,
storage=task.fileitem.storage,
src_path=Path(task.fileitem.path),
target_storage=task.target_storage)
task.target_directory = DirectoryHelper().get_dir(media=task.mediainfo,
storage=task.fileitem.storage,
src_path=Path(task.fileitem.path),
target_storage=task.target_storage)
if not task.target_storage and task.target_directory:
task.target_storage = task.target_directory.library_storage
@@ -784,7 +782,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 全局锁,避免重复处理
with downloader_lock:
# 获取下载器监控目录
download_dirs = self.directoryhelper.get_download_dirs()
download_dirs = DirectoryHelper().get_download_dirs()
# 如果没有下载器监控的目录则不处理
if not any(dir_info.monitor_type == "downloader" and dir_info.storage == "local"
for dir_info in download_dirs):
@@ -820,7 +818,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
logger.debug(f"文件 {file_path} 不在下载器监控目录中,不通过下载器进行整理")
continue
# 查询下载记录识别情况
downloadhis: DownloadHistory = self.downloadhis.get_by_hash(torrent.hash)
downloadhis: DownloadHistory = DownloadHistoryOper().get_by_hash(torrent.hash)
if downloadhis:
# 类型
try:
@@ -868,7 +866,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
return True
def __get_trans_fileitems(
self, fileitem: FileItem, depth: int = 1
self, fileitem: FileItem, depth: int = 1
) -> List[Tuple[FileItem, bool]]:
"""
获取整理目录或文件列表
@@ -876,6 +874,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
:param fileitem: 文件项
:param depth: 递归深度默认为1
"""
storagechain = StorageChain()
def __contains_bluray_sub(_fileitems: List[FileItem]) -> bool:
"""
@@ -899,10 +898,10 @@ class TransferChain(ChainBase, metaclass=Singleton):
"""
for p in _path.parents:
if p.name == "BDMV":
return self.storagechain.get_file_item(storage=_storage, path=p.parent)
return storagechain.get_file_item(storage=_storage, path=p.parent)
return None
if not self.storagechain.get_item(fileitem):
if not storagechain.get_item(fileitem):
logger.warn(f"目录或文件不存在:{fileitem.path}")
return []
@@ -917,7 +916,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
return [(fileitem, False)]
# 蓝光原盘根目录
sub_items = self.storagechain.list_files(fileitem) or []
sub_items = storagechain.list_files(fileitem) or []
if __contains_bluray_sub(sub_items):
return [(fileitem, True)]
@@ -994,7 +993,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
offset=epformat.offset) if epformat else None
# 整理屏蔽词
transfer_exclude_words = self.systemconfig.get(SystemConfigKey.TransferExcludeWords)
transfer_exclude_words = SystemConfigOper().get(SystemConfigKey.TransferExcludeWords)
# 汇总错误信息
err_msgs: List[str] = []
# 待整理目录或文件项
@@ -1013,7 +1012,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 如果是目录且不是⼀蓝光原盘,获取所有文件并整理
if trans_item.type == "dir" and not bluray_dir:
# 遍历获取下载目录所有文件(递归)
if files := self.storagechain.list_files(trans_item, recursion=True):
if files := StorageChain().list_files(trans_item, recursion=True):
file_items.extend([(file, False) for file in files])
else:
file_items.append((trans_item, bluray_dir))
@@ -1062,7 +1061,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 整理成功的不再处理
if not force:
transferd = self.transferhis.get_by_src(file_item.path, storage=file_item.storage)
transferd = TransferHistoryOper().get_by_src(file_item.path, storage=file_item.storage)
if transferd:
if not transferd.status:
all_success = False
@@ -1098,14 +1097,15 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 根据父路径获取下载历史
download_history = None
downloadhis = DownloadHistoryOper()
if bluray_dir:
# 蓝光原盘,按目录名查询
download_history = self.downloadhis.get_by_path(str(file_path))
download_history = downloadhis.get_by_path(str(file_path))
else:
# 按文件全路径查询
download_file = self.downloadhis.get_file_by_fullpath(str(file_path))
download_file = downloadhis.get_file_by_fullpath(str(file_path))
if download_file:
download_history = self.downloadhis.get_by_hash(download_file.download_hash)
download_history = downloadhis.get_by_hash(download_file.download_hash)
# 获取下载Hash
if download_history and (not downloader or not download_hash):
@@ -1148,12 +1148,13 @@ class TransferChain(ChainBase, metaclass=Singleton):
fail_num = 0
# 启动进度
self.progress.start(ProgressKey.FileTransfer)
progress = ProgressHelper()
progress.start(ProgressKey.FileTransfer)
__process_msg = f"开始整理,共 {total_num} 个文件 ..."
logger.info(__process_msg)
self.progress.update(value=0,
text=__process_msg,
key=ProgressKey.FileTransfer)
progress.update(value=0,
text=__process_msg,
key=ProgressKey.FileTransfer)
for transfer_task in transfer_tasks:
if global_vars.is_system_stopped:
@@ -1163,9 +1164,9 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 更新进度
__process_msg = f"正在整理 {processed_num + fail_num + 1}/{total_num}{transfer_task.fileitem.name} ..."
logger.info(__process_msg)
self.progress.update(value=(processed_num + fail_num) / total_num * 100,
text=__process_msg,
key=ProgressKey.FileTransfer)
progress.update(value=(processed_num + fail_num) / total_num * 100,
text=__process_msg,
key=ProgressKey.FileTransfer)
state, err_msg = self.__handle_transfer(
task=transfer_task,
callback=self.__default_callback
@@ -1181,10 +1182,10 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 整理结束
__end_msg = f"整理队列处理完成,共整理 {total_num} 个文件,失败 {fail_num}"
logger.info(__end_msg)
self.progress.update(value=100,
text=__end_msg,
key=ProgressKey.FileTransfer)
self.progress.end(ProgressKey.FileTransfer)
progress.update(value=100,
text=__end_msg,
key=ProgressKey.FileTransfer)
progress.end(ProgressKey.FileTransfer)
return all_success, "".join(err_msgs)
@@ -1239,7 +1240,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
:param mediaid: TMDB ID/豆瓣ID
"""
# 查询历史记录
history: TransferHistory = self.transferhis.get(logid)
history: TransferHistory = TransferHistoryOper().get(logid)
if not history:
logger.error(f"整理记录不存在ID{logid}")
return False, "整理记录不存在"
@@ -1255,7 +1256,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 更新媒体图片
self.obtain_images(mediainfo=mediainfo)
else:
mediainfo = self.mediachain.recognize_by_path(str(src_path), episode_group=history.episode_group)
mediainfo = MediaChain().recognize_by_path(str(src_path), episode_group=history.episode_group)
if not mediainfo:
return False, f"未识别到媒体信息,类型:{mtype.value}id{mediaid}"
# 重新执行整理
@@ -1265,7 +1266,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
if history.dest_fileitem:
# 解析目标文件对象
dest_fileitem = FileItem(**history.dest_fileitem)
self.storagechain.delete_file(dest_fileitem)
StorageChain().delete_file(dest_fileitem)
# 强制整理
if history.src_fileitem:
@@ -1320,18 +1321,19 @@ class TransferChain(ChainBase, metaclass=Singleton):
if tmdbid or doubanid:
# 有输入TMDBID时单个识别
# 识别媒体信息
mediainfo: MediaInfo = self.mediachain.recognize_media(tmdbid=tmdbid, doubanid=doubanid,
mtype=mtype, episode_group=episode_group)
mediainfo: MediaInfo = MediaChain().recognize_media(tmdbid=tmdbid, doubanid=doubanid,
mtype=mtype, episode_group=episode_group)
if not mediainfo:
return False, f"媒体信息识别失败tmdbid{tmdbid}doubanid{doubanid}type: {mtype.value}"
else:
# 更新媒体图片
self.obtain_images(mediainfo=mediainfo)
# 开始进度
self.progress.start(ProgressKey.FileTransfer)
self.progress.update(value=0,
text=f"开始整理 {fileitem.path} ...",
key=ProgressKey.FileTransfer)
progress = ProgressHelper()
progress.start(ProgressKey.FileTransfer)
progress.update(value=0,
text=f"开始整理 {fileitem.path} ...",
key=ProgressKey.FileTransfer)
# 开始整理
state, errmsg = self.do_transfer(
fileitem=fileitem,
@@ -1352,7 +1354,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
if not state:
return False, errmsg
self.progress.end(ProgressKey.FileTransfer)
progress.end(ProgressKey.FileTransfer)
logger.info(f"{fileitem.path} 整理完成")
return True, ""
else:
@@ -1373,7 +1375,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
return state, errmsg
def send_transfer_message(self, meta: MetaBase, mediainfo: MediaInfo,
transferinfo: TransferInfo, season_episode: Optional[str] = None, username: Optional[str] = None):
transferinfo: TransferInfo, season_episode: Optional[str] = None,
username: Optional[str] = None):
"""
发送入库成功的消息
"""

View File

@@ -1,9 +1,9 @@
from typing import List
from app.chain import ChainBase
from app.utils.singleton import Singleton
class TvdbChain(ChainBase, metaclass=Singleton):
class TvdbChain(ChainBase):
"""
Tvdb处理链单例运行
"""

View File

@@ -10,20 +10,15 @@ from app.log import logger
from app.schemas import AuthCredentials, AuthInterceptCredentials
from app.schemas.types import ChainEventType
from app.utils.otp import OtpUtils
from app.utils.singleton import Singleton
PASSWORD_INVALID_CREDENTIALS_MESSAGE = "用户名或密码或二次校验码不正确"
class UserChain(ChainBase, metaclass=Singleton):
class UserChain(ChainBase):
"""
用户链,处理多种认证协议
"""
def __init__(self):
super().__init__()
self.user_oper = UserOper()
def user_authenticate(
self,
username: Optional[str] = None,
@@ -90,7 +85,8 @@ class UserChain(ChainBase, metaclass=Singleton):
logger.debug(f"辅助认证未启用,认证类型 {grant_type} 未实现")
return False, "不支持的认证类型"
def password_authenticate(self, credentials: AuthCredentials) -> Tuple[bool, Union[User, str]]:
@staticmethod
def password_authenticate(credentials: AuthCredentials) -> Tuple[bool, Union[User, str]]:
"""
密码认证
@@ -103,7 +99,7 @@ class UserChain(ChainBase, metaclass=Singleton):
logger.info("密码认证失败,认证类型不匹配")
return False, PASSWORD_INVALID_CREDENTIALS_MESSAGE
user = self.user_oper.get_by_name(name=credentials.username)
user = UserOper().get_by_name(name=credentials.username)
if not user:
logger.info(f"密码认证失败,用户 {credentials.username} 不存在")
return False, PASSWORD_INVALID_CREDENTIALS_MESSAGE
@@ -131,8 +127,9 @@ class UserChain(ChainBase, metaclass=Singleton):
return False, "认证凭证无效"
# 检查是否因为用户被禁用
useroper = UserOper()
if credentials.username:
user = self.user_oper.get_by_name(name=credentials.username)
user = useroper.get_by_name(name=credentials.username)
if user and not user.is_active:
logger.info(f"用户 {user.name} 已被禁用,跳过后续身份校验")
return False, PASSWORD_INVALID_CREDENTIALS_MESSAGE
@@ -156,7 +153,7 @@ class UserChain(ChainBase, metaclass=Singleton):
success = self._process_auth_success(username=credentials.username, credentials=credentials)
if success:
logger.info(f"用户 {credentials.username} 辅助认证通过")
return True, self.user_oper.get_by_name(credentials.username)
return True, useroper.get_by_name(credentials.username)
else:
logger.warning(f"用户 {credentials.username} 辅助认证未通过")
return False, PASSWORD_INVALID_CREDENTIALS_MESSAGE
@@ -213,7 +210,8 @@ class UserChain(ChainBase, metaclass=Singleton):
return False
# 检查用户是否存在,如果不存在且当前为密码认证时则创建新用户
user = self.user_oper.get_by_name(name=username)
useroper = UserOper()
user = useroper.get_by_name(name=username)
if user:
# 如果用户存在,但是已经被禁用,则直接响应
if not user.is_active:
@@ -226,8 +224,8 @@ class UserChain(ChainBase, metaclass=Singleton):
return True
else:
if credentials.grant_type == "password":
self.user_oper.add(name=username, is_active=True, is_superuser=False,
hashed_password=get_password_hash(secrets.token_urlsafe(16)))
useroper.add(name=username, is_active=True, is_superuser=False,
hashed_password=get_password_hash(secrets.token_urlsafe(16)))
logger.info(f"用户 {username} 不存在,已通过 {credentials.grant_type} 认证并已创建普通用户")
return True
else:

View File

@@ -2,10 +2,9 @@ from typing import Any
from app.chain import ChainBase
from app.schemas.types import EventType
from app.utils.singleton import Singleton
class WebhookChain(ChainBase, metaclass=Singleton):
class WebhookChain(ChainBase):
"""
Webhook处理链
"""

View File

@@ -188,16 +188,14 @@ class WorkflowChain(ChainBase):
工作流链
"""
def __init__(self):
super().__init__()
self.workflowoper = WorkflowOper()
def process(self, workflow_id: int, from_begin: Optional[bool] = True) -> Tuple[bool, str]:
@staticmethod
def process(workflow_id: int, from_begin: Optional[bool] = True) -> Tuple[bool, str]:
"""
处理工作流
:param workflow_id: 工作流ID
:param from_begin: 是否从头开始默认为True
"""
workflowoper = WorkflowOper()
def save_step(action: Action, context: ActionContext):
"""
@@ -207,16 +205,16 @@ class WorkflowChain(ChainBase):
serialized_data = pickle.dumps(context)
# 使用Base64编码字节流
encoded_data = base64.b64encode(serialized_data).decode('utf-8')
self.workflowoper.step(workflow_id, action_id=action.id, context={
workflowoper.step(workflow_id, action_id=action.id, context={
"content": encoded_data
})
# 重置工作流
if from_begin:
self.workflowoper.reset(workflow_id)
workflowoper.reset(workflow_id)
# 查询工作流数据
workflow = self.workflowoper.get(workflow_id)
workflow = workflowoper.get(workflow_id)
if not workflow:
logger.warn(f"工作流 {workflow_id} 不存在")
return False, "工作流不存在"
@@ -228,7 +226,7 @@ class WorkflowChain(ChainBase):
return False, "工作流无流程"
logger.info(f"开始处理 {workflow.name},共 {len(workflow.actions)} 个动作 ...")
self.workflowoper.start(workflow_id)
workflowoper.start(workflow_id)
# 执行工作流
executor = WorkflowExecutor(workflow, step_callback=save_step)
@@ -236,15 +234,16 @@ class WorkflowChain(ChainBase):
if not executor.success:
logger.info(f"工作流 {workflow.name} 执行失败:{executor.errmsg}")
self.workflowoper.fail(workflow_id, result=executor.errmsg)
workflowoper.fail(workflow_id, result=executor.errmsg)
return False, executor.errmsg
else:
logger.info(f"工作流 {workflow.name} 执行完成")
self.workflowoper.success(workflow_id)
workflowoper.success(workflow_id)
return True, ""
def get_workflows(self) -> List[Workflow]:
@staticmethod
def get_workflows() -> List[Workflow]:
"""
获取工作流列表
"""
return self.workflowoper.list_enabled()
return WorkflowOper().list_enabled()

View File

@@ -61,7 +61,8 @@ class TemplateContextBuilder:
self._add_transfer_info(transferinfo)
self._add_torrent_info(torrentinfo)
self._add_file_info(file_extension)
if kwargs: self._context.update(kwargs)
if kwargs:
self._context.update(kwargs)
if include_raw_objects:
self._add_raw_objects(meta, mediainfo, torrentinfo, transferinfo, episodes_info)
@@ -73,7 +74,8 @@ class TemplateContextBuilder:
"""
增加媒体信息
"""
if not mediainfo: return
if not mediainfo:
return
season_fmt = f"S{mediainfo.season:02d}" if mediainfo.season is not None else None
base_info = {
# 标题
@@ -245,7 +247,8 @@ class TemplateContextBuilder:
"""
添加文件信息
"""
if not file_extension: return
if not file_extension:
return
file_info = {
# 文件后缀
"fileExt": file_extension,

View File

@@ -1,5 +1,7 @@
from typing import Optional, List
from app.chain.mediaserver import MediaServerChain
from app.chain.tmdb import TmdbChain
from app.core.cache import cached
from app.core.config import settings
from app.utils.http import RequestUtils
@@ -11,6 +13,49 @@ class WallpaperHelper(metaclass=Singleton):
def __init__(self):
self.req = RequestUtils(timeout=5)
@staticmethod
def get_wallpaper() -> Optional[str]:
"""
获取登录页面壁纸
"""
if settings.WALLPAPER == "bing":
url = WallpaperHelper().get_bing_wallpaper()
elif settings.WALLPAPER == "mediaserver":
url = MediaServerChain().get_latest_wallpaper()
elif settings.WALLPAPER == "customize":
url = WallpaperHelper().get_customize_wallpaper()
else:
url = WallpaperHelper().get_tmdb_wallpaper()
return url
@staticmethod
def get_wallpapers(num: int = 10) -> List[str]:
"""
获取登录页面壁纸列表
"""
if settings.WALLPAPER == "bing":
return WallpaperHelper().get_bing_wallpapers(num)
elif settings.WALLPAPER == "mediaserver":
return MediaServerChain().get_latest_wallpapers(count=num)
elif settings.WALLPAPER == "customize":
return WallpaperHelper().get_customize_wallpapers(num)
else:
return WallpaperHelper().get_tmdb_wallpapers(num)
@cached(maxsize=1, ttl=3600)
def get_tmdb_wallpaper(self) -> Optional[str]:
"""
获取TMDB每日壁纸
"""
return TmdbChain().get_random_wallpager()
@cached(maxsize=1, ttl=3600)
def get_tmdb_wallpapers(self, num: int = 10) -> List[str]:
"""
获取7天的TMDB每日壁纸
"""
return TmdbChain().get_trending_wallpapers(num)
@cached(maxsize=1, ttl=3600)
def get_bing_wallpaper(self) -> Optional[str]:
"""

View File

@@ -3,6 +3,7 @@ from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.chain.system import SystemChain
from app.core.config import global_vars
from app.startup.command_initializer import init_command, stop_command, restart_command
from app.startup.memory_initializer import init_memory_manager, stop_memory_manager
@@ -23,6 +24,8 @@ async def init_plugin_system():
init_plugin_scheduler()
# 重新注册命令
restart_command()
# 重启完成
SystemChain().restart_finish()
@asynccontextmanager