From 2b40e429653b629372903356f260e53901f34a2e Mon Sep 17 00:00:00 2001 From: jxxghp Date: Fri, 27 Dec 2024 21:16:38 +0800 Subject: [PATCH] fix bug --- app/api/endpoints/transfer.py | 2 +- app/chain/transfer.py | 213 +++++++++++++++++++--------------- app/schemas/transfer.py | 21 ++++ 3 files changed, 141 insertions(+), 95 deletions(-) diff --git a/app/api/endpoints/transfer.py b/app/api/endpoints/transfer.py index 4d889b82..bbe1ccbd 100644 --- a/app/api/endpoints/transfer.py +++ b/app/api/endpoints/transfer.py @@ -47,7 +47,7 @@ def query_name(path: str, filetype: str, }) -@router.get("/queue", summary="查询整理队列", response_model=List[dict]) +@router.get("/queue", summary="查询整理队列", response_model=List[schemas.TransferJob]) def query_queue(_: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 查询整理队列 diff --git a/app/chain/transfer.py b/app/chain/transfer.py index f57ef771..4cbbd803 100644 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -4,7 +4,7 @@ import threading from copy import deepcopy from pathlib import Path from queue import Queue -from typing import List, Optional, Tuple, Union, Dict, Callable, Any +from typing import List, Optional, Tuple, Union, Dict, Callable from app import schemas from app.chain import ChainBase @@ -30,6 +30,7 @@ from app.schemas.types import TorrentStatus, EventType, MediaType, ProgressKey, SystemConfigKey from app.utils.singleton import Singleton from app.utils.string import StringUtils +from schemas import TransferJob, TransferJobTask downloader_lock = threading.Lock() job_lock = threading.Lock() @@ -41,22 +42,7 @@ class JobManager: """ # 整理中的作业 - """ - { - meidaid: { - "media": schema.MediaInfo, - "season": int, - "tasks": [ - { - "fileitem": schema.FileItem, - "meta": schema.MetaInfo, - "state": "running" | "waiting" | "failed" | "completed", - } - ] - } - } - """ - _job_view: Dict[Tuple, Dict[str, Any]] = {} + _job_view: Dict[Tuple, TransferJob] = {} # 汇总季集清单 _season_episodes: Dict[Tuple, List[int]] = {} @@ -65,17 +51,27 @@ class JobManager: self._season_episodes = {} @staticmethod - def __get_id(task: TransferTask = None, media: MediaInfo = None, season: int = None) -> Tuple: + def __get_meta_id(meta: MetaBase = None, season: int = None) -> Tuple: + """ + 获取元数据ID + """ + return meta.name, season + + @staticmethod + def __get_media_id(media: MediaInfo = None, season: int = None) -> Tuple: + """ + 获取媒体ID + """ + return media.tmdb_id or media.douban_id, season + + def __get_id(self, task: TransferTask = None) -> Tuple: """ 获取作业ID """ - if task: - if task.mediainfo: - return task.mediainfo.tmdb_id or task.mediainfo.douban_id, task.meta.begin_season - else: - return task.meta.name, task.meta.begin_season + if task.mediainfo: + return self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season) else: - return media.tmdb_id or media.douban_id, season + return self.__get_meta_id(meta=task.meta, season=task.meta.begin_season) @staticmethod def __get_media(task: TransferTask) -> schemas.MediaInfo: @@ -110,24 +106,30 @@ class JobManager: """ if not any([task, task.meta, task.fileitem]): return - with job_lock: + with (job_lock): __mediaid__ = self.__get_id(task) if __mediaid__ not in self._job_view: - self._job_view[__mediaid__] = { - "media": self.__get_media(task), - "season": task.meta.begin_season, - "tasks": [{ - "state": state, - "fileitem": task.fileitem, - "meta": self.__get_meta(task) - }] - } + self._job_view[__mediaid__] = TransferJob( + media=self.__get_media(task), + season=task.meta.begin_season, + tasks=[TransferJobTask( + fileitem=task.fileitem, + meta=self.__get_meta(task), + downloader=task.downloader, + download_hash=task.download_hash, + state=state + )] + ) else: - self._job_view[__mediaid__]["tasks"].append({ - "state": state, - "fileitem": task.fileitem, - "meta": self.__get_meta(task) - }) + self._job_view[__mediaid__].tasks.append( + TransferJobTask( + fileitem=task.fileitem, + meta=self.__get_meta(task), + downloader=task.downloader, + download_hash=task.download_hash, + state=state + ) + ) # 添加季集信息 if self._season_episodes.get(__mediaid__): self._season_episodes[__mediaid__].extend(task.meta.episode_list) @@ -144,9 +146,9 @@ class JobManager: if __mediaid__ not in self._job_view: return # 更新状态 - for job in self._job_view[__mediaid__]["tasks"]: - if job["fileitem"] == task.fileitem: - job["state"] = "running" + for t in self._job_view[__mediaid__].tasks: + if t.fileitem == task.fileitem: + t.state = "running" break def finish_task(self, task: TransferTask): @@ -158,9 +160,9 @@ class JobManager: if __mediaid__ not in self._job_view: return # 更新状态 - for job in self._job_view[__mediaid__]["tasks"]: - if job["fileitem"] == task.fileitem: - job["state"] = "completed" + for t in self._job_view[__mediaid__].tasks: + if t.fileitem == task.fileitem: + t.state = "completed" break def fail_task(self, task: TransferTask): @@ -172,9 +174,9 @@ class JobManager: if __mediaid__ not in self._job_view: return # 更新状态 - for job in self._job_view[__mediaid__]["tasks"]: - if job["fileitem"] == task.fileitem: - job["state"] = "failed" + for t in self._job_view[__mediaid__].tasks: + if t.fileitem == task.fileitem: + t.state = "failed" break # 移除剧集信息 if __mediaid__ in self._season_episodes: @@ -182,18 +184,18 @@ class JobManager: set(self._season_episodes[__mediaid__]) - set(task.meta.episode_list) ) - def remove_task(self, fileitem: FileItem) -> Optional[Dict[str, Any]]: + def remove_task(self, fileitem: FileItem) -> Optional[TransferJobTask]: """ - 移除整理任务 + 移除所有作业中的整理任务 """ with job_lock: for mediaid in list(self._job_view): job = self._job_view[mediaid] - for task in job["tasks"]: - if task["fileitem"] == fileitem: - job["tasks"].remove(task) + for task in job.tasks: + if task.fileitem == fileitem: + job.tasks.remove(task) # 如果没有作业了,则移除作业 - if not job["tasks"]: + if not job.tasks: self._job_view.pop(mediaid) # 移除季集信息 if mediaid in self._season_episodes: @@ -202,11 +204,11 @@ class JobManager: ) return task - def remove_job(self, media: MediaInfo, season: int = None) -> Optional[Dict[str, Any]]: + def remove_job(self, task: TransferTask) -> Optional[TransferJob]: """ 移除作业 """ - __mediaid__ = self.__get_id(media=media, season=season) + __mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season) with job_lock: # 移除作业 if __mediaid__ in self._job_view: @@ -215,75 +217,98 @@ class JobManager: self._season_episodes.pop(__mediaid__) return self._job_view.pop(__mediaid__) - def is_done(self, media: MediaInfo, season: int = None) -> bool: + def is_done(self, task: TransferTask) -> bool: """ 检查某项作业是否整理完成(不管成功还是失败) """ - __mediaid__ = self.__get_id(media=media, season=season) - if __mediaid__ not in self._job_view: - return False - with job_lock: - return all( - [task["state"] in ["completed", "failed"] for task in self._job_view[__mediaid__]["tasks"]] + __metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season) + __mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season) + if __metaid__ in self._job_view: + meta_done = all( + task.state in ["completed", "failed"] for task in self._job_view[__metaid__].tasks ) + else: + meta_done = True + if __mediaid__ in self._job_view: + media_done = all( + task.state in ["completed", "failed"] for task in self._job_view[__mediaid__].tasks + ) + else: + media_done = False + return meta_done and media_done - def is_finished(self, media: MediaInfo, season: int = None) -> bool: + def is_finished(self, task: TransferTask) -> bool: """ 检查某项作业是否已完成且有成功的记录 """ - __mediaid__ = self.__get_id(media=media, season=season) - if __mediaid__ not in self._job_view: - return False - with job_lock: - # 所有任务都是完成或者失败状态,且有完成的记录 - tasks = self._job_view[__mediaid__]["tasks"] - return True if all( - [task["state"] in ["completed", "failed"] for task in tasks] + __metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season) + __mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season) + if __metaid__ in self._job_view: + meta_finished = all( + task["state"] in ["completed", "failed"] for task in self._job_view[__metaid__].tasks + ) + else: + meta_finished = True + if __mediaid__ in self._job_view: + tasks = self._job_view[__mediaid__].tasks + media_finished = all( + task["state"] in ["completed", "failed"] for task in tasks ) and any( - [task["state"] == "completed" for task in tasks] - ) else False + task["state"] == "completed" for task in tasks + ) + else: + media_finished = False + return meta_finished and media_finished - def is_success(self, media: MediaInfo, season: int = None) -> bool: + def is_success(self, task: TransferTask) -> bool: """ 检查某项作业是否全部成功 """ - __mediaid__ = self.__get_id(media=media, season=season) - if __mediaid__ not in self._job_view: - return False - with job_lock: - return all( - [task["state"] in ["completed"] for task in self._job_view[__mediaid__]["tasks"]] + __metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season) + __mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season) + if __mediaid__ in self._job_view: + meta_success = all( + task["state"] in ["completed"] for task in self._job_view[__metaid__].tasks ) + else: + meta_success = True + if __mediaid__ in self._job_view: + media_success = all( + task["state"] in ["completed"] for task in self._job_view[__mediaid__].tasks + ) + else: + media_success = False + return meta_success and media_success - def get_success_tasks(self, media: MediaInfo, season: int = None) -> List[dict]: + def get_success_tasks(self, media: MediaInfo, season: int = None) -> List[TransferJobTask]: """ 获取某项任务成功的任务 """ - __mediaid__ = self.__get_id(media=media, season=season) + __mediaid__ = self.__get_media_id(media=media, season=season) with job_lock: if __mediaid__ not in self._job_view: return [] - return [task for task in self._job_view[__mediaid__]["tasks"] if task["state"] == "completed"] + return [task for task in self._job_view[__mediaid__].tasks if task.state == "completed"] def count(self, media: MediaInfo, season: int = None) -> int: """ 获取某项任务总数 """ - __mediaid__ = self.__get_id(media=media, season=season) + __mediaid__ = self.__get_media_id(media=media, season=season) with job_lock: # 计算状态为完成的任务数 if __mediaid__ not in self._job_view: return 0 - return len([task for task in self._job_view[__mediaid__]["tasks"] if task["state"] == "completed"]) + return len([task for task in self._job_view[__mediaid__].tasks if task.state == "completed"]) def total(self) -> int: """ 获取所有task任务总数 """ with job_lock: - return sum([len(job["tasks"]) for job in self._job_view.values()]) + return sum([len(job.tasks) for job in self._job_view.values()]) - def list_jobs(self) -> List[dict]: + def list_jobs(self) -> List[TransferJob]: """ 获取任务列表 """ @@ -293,7 +318,7 @@ class JobManager: """ 获取季集清单 """ - __mediaid__ = self.__get_id(media=media, season=season) + __mediaid__ = self.__get_media_id(media=media, season=season) with job_lock: return self._season_episodes.get(__mediaid__) or [] @@ -394,7 +419,7 @@ class TransferChain(ChainBase, metaclass=Singleton): }) # 全部整理成功时 - if self.jobview.is_success(task.mediainfo, task.meta.begin_season): + if self.jobview.is_success(task): # 移动模式删除空目录 if transferinfo.transfer_type in ["move"]: # 所有成功的业务 @@ -409,7 +434,7 @@ class TransferChain(ChainBase, metaclass=Singleton): self.storagechain.delete_media_file(t.fileitem, delete_self=False) # 整理完成且有成功的任务时 - if self.jobview.is_finished(task.mediainfo, task.meta.begin_season): + if self.jobview.is_finished(task): # 发送通知 if transferinfo.need_notify: se_str = None @@ -518,8 +543,8 @@ class TransferChain(ChainBase, metaclass=Singleton): text=__process_msg, key=ProgressKey.FileTransfer) # 移除已完成的任务 - if self.jobview.is_done(task.mediainfo, task.meta.begin_season): - self.jobview.remove_job(task.mediainfo, task.meta.begin_season) + if self.jobview.is_done(task): + self.jobview.remove_job(task) except queue.Empty: if not __queue_start: # 结束进度 @@ -601,7 +626,7 @@ class TransferChain(ChainBase, metaclass=Singleton): task.mediainfo = mediainfo # 更新队列任务 curr_task = self.jobview.remove_task(task.fileitem) - self.jobview.add_task(task, state=curr_task['state'] if curr_task else "waiting") + self.jobview.add_task(task, state=curr_task.state if curr_task else "waiting") # 查询整理目标目录 if not task.target_directory: diff --git a/app/schemas/transfer.py b/app/schemas/transfer.py index 1a4dd980..f4981b29 100644 --- a/app/schemas/transfer.py +++ b/app/schemas/transfer.py @@ -6,6 +6,7 @@ from pydantic import BaseModel, Field from app.schemas import TmdbEpisode, DownloadHistory from app.schemas.file import FileItem from app.schemas.system import TransferDirectoryConf +from schemas import MediaInfo, MetaInfo class TransferTorrent(BaseModel): @@ -74,6 +75,26 @@ class TransferTask(BaseModel): return dicts +class TransferJobTask(BaseModel): + """ + 文件整理作业任务 + """ + fileitem: Optional[FileItem] = None + meta: Optional[MetaInfo] = None + state: Optional[str] = None + downloader: Optional[str] = None + download_hash: Optional[str] = None + + +class TransferJob(BaseModel): + """ + 文件整理作业 + """ + media: Optional[MediaInfo] = None + season: Optional[int] = None + tasks: Optional[List[TransferJobTask]] = Field(default_factory=list) + + class TransferInfo(BaseModel): """ 文件整理结果