diff --git a/app/chain/transfer.py b/app/chain/transfer.py index f643df43..3bb1b43e 100644 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -1,10 +1,12 @@ import queue import re import threading +from copy import deepcopy from pathlib import Path from queue import Queue from typing import List, Optional, Tuple, Union, Dict, Callable, Any +from app import schemas from app.chain import ChainBase from app.chain.media import MediaChain from app.chain.storage import StorageChain @@ -41,13 +43,20 @@ class JobManager: # 整理中的作业 """ { - meidaid: [{ - "task": TransferTask, - "state": "running" | "waiting" | "failed" | "completed", - }] + meidaid: { + "media": schema.MediaInfo, + "season": int, + "tasks": [ + { + "fileitem": schema.FileItem, + "meta": schema.MetaBase, + "state": "running" | "waiting" | "failed" | "completed", + } + ] + } } """ - _job_view: Dict[Tuple, List[Dict[str, Any]]] = {} + _job_view: Dict[Tuple, Dict[str, Any]] = {} # 汇总季集清单 _season_episodes: Dict[Tuple, List[int]] = {} @@ -65,21 +74,46 @@ class JobManager: else: return media.tmdb_id or media.douban_id, season + @staticmethod + def __get_media(task: TransferTask) -> schemas.MediaInfo: + """ + 获取媒体信息 + """ + mediainfo = deepcopy(task.mediainfo) + mediainfo.clear() + return schemas.MediaInfo(**mediainfo.to_dict()) + + @staticmethod + def __get_meta(task: TransferTask) -> schemas.MetaInfo: + """ + 获取元数据 + """ + return schemas.MetaInfo(**task.meta.to_dict()) + def add_task(self, task: TransferTask): """ 添加整理任务 """ - if not task or not task.mediainfo or not task.meta: + if not any([task, task.mediainfo, task.meta, task.fileitem]): return with job_lock: __mediaid__ = self.__get_id(task) if __mediaid__ not in self._job_view: - self._job_view[__mediaid__] = [] - # 添加任务 - self._job_view[__mediaid__].append({ - "task": task, - "state": "waiting" - }) + self._job_view[__mediaid__] = { + "media": self.__get_media(task), + "season": task.meta.begin_season, + "tasks": [{ + "state": "waiting", + "fileitem": task.fileitem, + "meta": self.__get_meta(task) + }] + } + else: + self._job_view[__mediaid__]["tasks"].append({ + "state": "waiting", + "fileitem": task.fileitem, + "meta": self.__get_meta(task) + }) # 添加季集信息 if self._season_episodes.get(__mediaid__): self._season_episodes[__mediaid__].extend(task.meta.episode_list) @@ -91,15 +125,13 @@ class JobManager: """ 任务运行中 """ - if not task or not task.mediainfo or not task.meta: - return with job_lock: __mediaid__ = self.__get_id(task) if __mediaid__ not in self._job_view: return # 更新状态 - for job in self._job_view[__mediaid__]: - if job["task"] == task: + for job in self._job_view[__mediaid__]["tasks"]: + if job["fileitem"] == task.fileitem: job["state"] = "running" break @@ -107,15 +139,13 @@ class JobManager: """ 任务完成 """ - if not task or not task.mediainfo or not task.meta: - return with job_lock: __mediaid__ = self.__get_id(task) if __mediaid__ not in self._job_view: return # 更新状态 - for job in self._job_view[__mediaid__]: - if job["task"] == task: + for job in self._job_view[__mediaid__]["tasks"]: + if job["fileitem"] == task.fileitem: job["state"] = "completed" break @@ -130,16 +160,17 @@ class JobManager: if __mediaid__ not in self._job_view: return # 更新状态 - for job in self._job_view[__mediaid__]: - if job["task"] == task: + for job in self._job_view[__mediaid__]["tasks"]: + if job["fileitem"] == task.fileitem: job["state"] = "failed" break # 移除剧集信息 if __mediaid__ in self._season_episodes: self._season_episodes[__mediaid__] = list( - set(self._season_episodes[__mediaid__]) - set(task.meta.episode_list)) + set(self._season_episodes[__mediaid__]) - set(task.meta.episode_list) + ) - def remove_job(self, media: MediaInfo, season: int = None) -> Optional[List[Dict[str, Any]]]: + def remove_job(self, media: MediaInfo, season: int = None) -> Optional[Dict[str, Any]]: """ 移除作业 """ @@ -154,7 +185,7 @@ class JobManager: if __mediaid__ in self._season_episodes: self._season_episodes.pop(__mediaid__) - def is_finished(self, media: MediaInfo, season: int = None): + def is_finished(self, media: MediaInfo, season: int = None) -> bool: """ 任务检查某项任务是否全部为已完成状态 """ @@ -162,22 +193,22 @@ class JobManager: return False __mediaid__ = self.__get_id(media=media, season=season) if __mediaid__ not in self._job_view: - return True + return False with job_lock: - return all([job["state"] in ["completed", "failed"] for job in self._job_view[__mediaid__]]) + return all([job["state"] == "completed" for job in self._job_view[__mediaid__]["tasks"]]) - def total(self): + def total(self) -> int: """ 获取所有task任务总数 """ with job_lock: return sum([len(jobs) for jobs in self._job_view.values()]) - def list_jobs(self) -> Dict: + def list_jobs(self) -> List[dict]: """ 获取任务列表 """ - return self._job_view + return list(self._job_view.values()) def season_episodes(self, media: MediaInfo, season: int = None) -> List[int]: """ @@ -301,7 +332,11 @@ class TransferChain(ChainBase, metaclass=Singleton): if transferinfo.need_notify: se_str = None if task.mediainfo.type == MediaType.TV: - se_str = f"{task.meta.season} {StringUtils.format_ep(self.jobview.season_episodes(task.mediainfo, task.meta.begin_season))}" + season_episodes = self.jobview.season_episodes(task.mediainfo, task.meta.begin_season) + if season_episodes: + se_str = f"{task.meta.season} {StringUtils.format_ep(season_episodes)}" + else: + se_str = f"{task.meta.season}" self.send_transfer_message(meta=task.meta, mediainfo=task.mediainfo, transferinfo=transferinfo,