diff --git a/app/actions/add_download.py b/app/actions/add_download.py index b7c3b012..3315ba7e 100644 --- a/app/actions/add_download.py +++ b/app/actions/add_download.py @@ -24,6 +24,11 @@ class AddDownloadAction(BaseAction): # 已添加的下载 _added_downloads = [] + def __init__(self): + super().__init__() + self.downloadchain = DownloadChain() + self.mediachain = MediaChain() + @property def name(self) -> str: return "添加下载资源" @@ -44,13 +49,13 @@ class AddDownloadAction(BaseAction): if not t.meta_info: t.meta_info = MetaInfo(title=t.title, subtitle=t.description) if not t.media_info: - t.media_info = MediaChain().recognize_media(meta=t.meta_info) + t.media_info = self.mediachain.recognize_media(meta=t.meta_info) if not t.media_info: logger.warning(f"{t.title} 未识别到媒体信息,无法下载") continue - did = DownloadChain().download_single(context=t, - downloader=params.downloader, - save_path=params.save_path) + did = self.downloadchain.download_single(context=t, + downloader=params.downloader, + save_path=params.save_path) if did: self._added_downloads.append(did) @@ -59,5 +64,6 @@ class AddDownloadAction(BaseAction): context.downloads.extend( [DownloadTask(download_id=did, downloader=params.downloader) for did in self._added_downloads] ) + self.job_done() return context diff --git a/app/actions/add_subscribe.py b/app/actions/add_subscribe.py index 481b149a..8aacae04 100644 --- a/app/actions/add_subscribe.py +++ b/app/actions/add_subscribe.py @@ -1,4 +1,8 @@ from app.actions import BaseAction +from app.chain.subscribe import SubscribeChain +from app.core.config import settings +from app.db.subscribe_oper import SubscribeOper +from app.log import logger from app.schemas import ActionParams, ActionContext @@ -14,6 +18,13 @@ class AddSubscribeAction(BaseAction): 添加订阅 """ + _added_subscribes = [] + + def __init__(self): + super().__init__() + self.subscribechain = SubscribeChain() + self.subscribeoper = SubscribeOper() + @property def name(self) -> str: return "添加订阅" @@ -24,7 +35,33 @@ class AddSubscribeAction(BaseAction): @property def success(self) -> bool: - return True + return True if self._added_subscribes else False async def execute(self, params: AddSubscribeParams, context: ActionContext) -> ActionContext: - pass + """ + 将medias中的信息添加订阅,如果订阅不存在的话 + """ + for media in context.medias: + if self.subscribechain.exists(media): + logger.info(f"{media.title} 已存在订阅") + continue + # 添加订阅 + sid, message = self.subscribechain.add(mtype=media.type, + title=media.title, + year=media.year, + tmdbid=media.tmdb_id, + season=media.season, + doubanid=media.douban_id, + bangumiid=media.bangumi_id, + mediaid=media.media_id, + username=settings.SUPERUSER) + if sid: + self._added_subscribes.append(sid) + + if self._added_subscribes: + logger.info(f"已添加 {len(self._added_subscribes)} 个订阅") + for sid in self._added_subscribes: + context.subscribes.append(self.subscribeoper.get(sid)) + + self.job_done() + return context diff --git a/app/actions/fetch_downloads.py b/app/actions/fetch_downloads.py index 2545ff94..491fc6ba 100644 --- a/app/actions/fetch_downloads.py +++ b/app/actions/fetch_downloads.py @@ -1,4 +1,5 @@ from app.actions import BaseAction +from app.chain.download import DownloadChain from app.schemas import ActionParams, ActionContext @@ -14,6 +15,12 @@ class FetchDownloadsAction(BaseAction): 获取下载任务 """ + _downloads = [] + + def __init__(self): + super().__init__() + self.downloadchain = DownloadChain() + @property def name(self) -> str: return "获取下载任务" @@ -24,7 +31,23 @@ class FetchDownloadsAction(BaseAction): @property def success(self) -> bool: - return True + if not self._downloads: + return True + return True if all([d.completed for d in self._downloads]) else False async def execute(self, params: FetchDownloadsParams, context: ActionContext) -> ActionContext: - pass + """ + 更新downloads中的下载任务状态 + """ + self._downloads = context.downloads + for download in self._downloads: + torrents = self.downloadchain.list_torrents(hashs=[download.download_id]) + if not torrents: + download.completed = True + continue + for t in torrents: + if t.progress >= 100: + download.completed = True + + self.job_done() + return context diff --git a/app/actions/fetch_rss.py b/app/actions/fetch_rss.py index def05ffe..9ea9eca4 100644 --- a/app/actions/fetch_rss.py +++ b/app/actions/fetch_rss.py @@ -3,7 +3,13 @@ from typing import Optional from pydantic import Field from app.actions import BaseAction -from app.schemas import ActionParams, ActionContext +from app.chain.media import MediaChain +from app.core.config import settings +from app.core.context import Context +from app.core.metainfo import MetaInfo +from app.helper.rss import RssHelper +from app.log import logger +from app.schemas import ActionParams, ActionContext, TorrentInfo class FetchRssParams(ActionParams): @@ -13,7 +19,9 @@ class FetchRssParams(ActionParams): url: str = Field(None, description="RSS地址") proxy: Optional[bool] = Field(False, description="是否使用代理") timeout: Optional[int] = Field(15, description="超时时间") - headers: Optional[dict] = Field(None, description="请求头") + content_type: Optional[str] = Field(None, description="Content-Type") + referer: Optional[str] = Field(None, description="Referer") + ua: Optional[str] = Field(None, description="User-Agent") recognize: Optional[bool] = Field(False, description="是否识别") @@ -22,6 +30,13 @@ class FetchRssAction(BaseAction): 获取RSS资源列表 """ + _rss_torrents = [] + + def __init__(self): + super().__init__() + self.rsshelper = RssHelper() + self.mediachain = MediaChain() + @property def name(self) -> str: return "获取RSS资源列表" @@ -32,7 +47,54 @@ class FetchRssAction(BaseAction): @property def success(self) -> bool: - return True + return True if self._rss_torrents else False async def execute(self, params: FetchRssParams, context: ActionContext) -> ActionContext: - pass + """ + 请求RSS地址获取数据,并解析为资源列表 + """ + if not params.url: + return context + + headers = {} + if params.content_type: + headers["Content-Type"] = params.content_type + if params.referer: + headers["Referer"] = params.referer + if params.ua: + headers["User-Agent"] = params.ua + + rss_items = self.rsshelper.parse(url=params.url, + proxy=settings.PROXY if params.proxy else None, + timeout=params.timeout, + headers=headers) + if not rss_items: + logger.error(f'RSS地址 {params.url} 未获取到RSS数据!') + return context + + # 组装种子 + for item in rss_items: + if not item.get("title"): + continue + torrentinfo = TorrentInfo( + title=item.get("title"), + enclosure=item.get("enclosure"), + page_url=item.get("link"), + size=item.get("size"), + pubdate=item["pubdate"].strftime("%Y-%m-%d %H:%M:%S") if item.get("pubdate") else None, + ) + meta, mediainfo = None, None + if params.recognize: + meta = MetaInfo(title=torrentinfo.title, subtitle=torrentinfo.description) + mediainfo = self.mediachain.recognize_media(meta) + if not mediainfo: + logger.warning(f"{torrentinfo.title} 未识别到媒体信息") + continue + self._rss_torrents.append(Context(meta_info=meta, media_info=mediainfo, torrent_info=torrentinfo)) + + if self._rss_torrents: + logger.info(f"已获取 {len(self._rss_torrents)} 个RSS资源") + context.torrents.extend(self._rss_torrents) + + self.job_done() + return context diff --git a/app/core/workflow.py b/app/core/workflow.py index f6233716..363730a3 100644 --- a/app/core/workflow.py +++ b/app/core/workflow.py @@ -1,7 +1,6 @@ from time import sleep from typing import Dict, Any, Tuple -from app.actions import BaseAction from app.helper.module import ModuleHelper from app.log import logger from app.schemas import Action, ActionContext @@ -14,7 +13,7 @@ class WorkFlowManager(metaclass=Singleton): """ # 所有动作定义 - _actions: Dict[str, BaseAction] = {} + _actions: Dict[str, Any] = {} def __init__(self): self.init() @@ -59,14 +58,18 @@ class WorkFlowManager(metaclass=Singleton): if not context: context = ActionContext() if action.id in self._actions: - action_obj = self._actions[action.id] + # 实例化 + action_obj = self._actions[action.id]() + # 执行 logger.info(f"执行动作: {action.id} - {action.name}") result_context = action_obj.execute(action.params, context) logger.info(f"{action.name} 执行结果: {action_obj.success}") if action.loop and action.loop_interval: while not action_obj.done: + # 等待 logger.info(f"{action.name} 等待 {action.loop_interval} 秒后继续执行") sleep(action.loop_interval) + # 执行 logger.info(f"继续执行动作: {action.id} - {action.name}") result_context = action_obj.execute(action.params, result_context) logger.info(f"{action.name} 执行结果: {action_obj.success}") diff --git a/app/modules/qbittorrent/__init__.py b/app/modules/qbittorrent/__init__.py index 8e1ae650..5af555cc 100644 --- a/app/modules/qbittorrent/__init__.py +++ b/app/modules/qbittorrent/__init__.py @@ -239,7 +239,9 @@ class QbittorrentModule(_ModuleBase, _DownloaderBase[Qbittorrent]): path=torrent_path, hash=torrent.get('hash'), size=torrent.get('total_size'), - tags=torrent.get('tags') + tags=torrent.get('tags'), + progress=torrent.get('progress') * 100, + state="paused" if torrent.get('state') in ("paused", "pausedDL") else "downloading", )) elif status == TorrentStatus.TRANSFER: # 获取已完成且未整理的 diff --git a/app/modules/transmission/__init__.py b/app/modules/transmission/__init__.py index eb6b967d..9411ccee 100644 --- a/app/modules/transmission/__init__.py +++ b/app/modules/transmission/__init__.py @@ -246,7 +246,9 @@ class TransmissionModule(_ModuleBase, _DownloaderBase[Transmission]): title=torrent.name, path=Path(torrent.download_dir) / torrent.name, hash=torrent.hashString, - tags=",".join(torrent.labels or []) + tags=",".join(torrent.labels or []), + progress=torrent.progress, + state="paused" if torrent.status == "stopped" else "downloading", )) elif status == TorrentStatus.DOWNLOADING: # 获取正在下载的任务 diff --git a/app/schemas/transfer.py b/app/schemas/transfer.py index d4a8730d..442b3056 100644 --- a/app/schemas/transfer.py +++ b/app/schemas/transfer.py @@ -21,6 +21,8 @@ class TransferTorrent(BaseModel): tags: Optional[str] = None size: Optional[int] = 0 userid: Optional[str] = None + progress: Optional[float] = 0 + state: Optional[str] = None class DownloadingTorrent(BaseModel):