This commit is contained in:
jxxghp
2024-12-27 21:16:38 +08:00
parent 0eac7816bc
commit 2b40e42965
3 changed files with 141 additions and 95 deletions

View File

@@ -47,7 +47,7 @@ def query_name(path: str, filetype: str,
})
@router.get("/queue", summary="查询整理队列", response_model=List[dict])
@router.get("/queue", summary="查询整理队列", response_model=List[schemas.TransferJob])
def query_queue(_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
查询整理队列

View File

@@ -4,7 +4,7 @@ import threading
from copy import deepcopy
from pathlib import Path
from queue import Queue
from typing import List, Optional, Tuple, Union, Dict, Callable, Any
from typing import List, Optional, Tuple, Union, Dict, Callable
from app import schemas
from app.chain import ChainBase
@@ -30,6 +30,7 @@ from app.schemas.types import TorrentStatus, EventType, MediaType, ProgressKey,
SystemConfigKey
from app.utils.singleton import Singleton
from app.utils.string import StringUtils
from schemas import TransferJob, TransferJobTask
downloader_lock = threading.Lock()
job_lock = threading.Lock()
@@ -41,22 +42,7 @@ class JobManager:
"""
# 整理中的作业
"""
{
meidaid: {
"media": schema.MediaInfo,
"season": int,
"tasks": [
{
"fileitem": schema.FileItem,
"meta": schema.MetaInfo,
"state": "running" | "waiting" | "failed" | "completed",
}
]
}
}
"""
_job_view: Dict[Tuple, Dict[str, Any]] = {}
_job_view: Dict[Tuple, TransferJob] = {}
# 汇总季集清单
_season_episodes: Dict[Tuple, List[int]] = {}
@@ -65,17 +51,27 @@ class JobManager:
self._season_episodes = {}
@staticmethod
def __get_id(task: TransferTask = None, media: MediaInfo = None, season: int = None) -> Tuple:
def __get_meta_id(meta: MetaBase = None, season: int = None) -> Tuple:
"""
获取元数据ID
"""
return meta.name, season
@staticmethod
def __get_media_id(media: MediaInfo = None, season: int = None) -> Tuple:
"""
获取媒体ID
"""
return media.tmdb_id or media.douban_id, season
def __get_id(self, task: TransferTask = None) -> Tuple:
"""
获取作业ID
"""
if task:
if task.mediainfo:
return task.mediainfo.tmdb_id or task.mediainfo.douban_id, task.meta.begin_season
else:
return task.meta.name, task.meta.begin_season
if task.mediainfo:
return self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season)
else:
return media.tmdb_id or media.douban_id, season
return self.__get_meta_id(meta=task.meta, season=task.meta.begin_season)
@staticmethod
def __get_media(task: TransferTask) -> schemas.MediaInfo:
@@ -110,24 +106,30 @@ class JobManager:
"""
if not any([task, task.meta, task.fileitem]):
return
with job_lock:
with (job_lock):
__mediaid__ = self.__get_id(task)
if __mediaid__ not in self._job_view:
self._job_view[__mediaid__] = {
"media": self.__get_media(task),
"season": task.meta.begin_season,
"tasks": [{
"state": state,
"fileitem": task.fileitem,
"meta": self.__get_meta(task)
}]
}
self._job_view[__mediaid__] = TransferJob(
media=self.__get_media(task),
season=task.meta.begin_season,
tasks=[TransferJobTask(
fileitem=task.fileitem,
meta=self.__get_meta(task),
downloader=task.downloader,
download_hash=task.download_hash,
state=state
)]
)
else:
self._job_view[__mediaid__]["tasks"].append({
"state": state,
"fileitem": task.fileitem,
"meta": self.__get_meta(task)
})
self._job_view[__mediaid__].tasks.append(
TransferJobTask(
fileitem=task.fileitem,
meta=self.__get_meta(task),
downloader=task.downloader,
download_hash=task.download_hash,
state=state
)
)
# 添加季集信息
if self._season_episodes.get(__mediaid__):
self._season_episodes[__mediaid__].extend(task.meta.episode_list)
@@ -144,9 +146,9 @@ class JobManager:
if __mediaid__ not in self._job_view:
return
# 更新状态
for job in self._job_view[__mediaid__]["tasks"]:
if job["fileitem"] == task.fileitem:
job["state"] = "running"
for t in self._job_view[__mediaid__].tasks:
if t.fileitem == task.fileitem:
t.state = "running"
break
def finish_task(self, task: TransferTask):
@@ -158,9 +160,9 @@ class JobManager:
if __mediaid__ not in self._job_view:
return
# 更新状态
for job in self._job_view[__mediaid__]["tasks"]:
if job["fileitem"] == task.fileitem:
job["state"] = "completed"
for t in self._job_view[__mediaid__].tasks:
if t.fileitem == task.fileitem:
t.state = "completed"
break
def fail_task(self, task: TransferTask):
@@ -172,9 +174,9 @@ class JobManager:
if __mediaid__ not in self._job_view:
return
# 更新状态
for job in self._job_view[__mediaid__]["tasks"]:
if job["fileitem"] == task.fileitem:
job["state"] = "failed"
for t in self._job_view[__mediaid__].tasks:
if t.fileitem == task.fileitem:
t.state = "failed"
break
# 移除剧集信息
if __mediaid__ in self._season_episodes:
@@ -182,18 +184,18 @@ class JobManager:
set(self._season_episodes[__mediaid__]) - set(task.meta.episode_list)
)
def remove_task(self, fileitem: FileItem) -> Optional[Dict[str, Any]]:
def remove_task(self, fileitem: FileItem) -> Optional[TransferJobTask]:
"""
移除整理任务
移除所有作业中的整理任务
"""
with job_lock:
for mediaid in list(self._job_view):
job = self._job_view[mediaid]
for task in job["tasks"]:
if task["fileitem"] == fileitem:
job["tasks"].remove(task)
for task in job.tasks:
if task.fileitem == fileitem:
job.tasks.remove(task)
# 如果没有作业了,则移除作业
if not job["tasks"]:
if not job.tasks:
self._job_view.pop(mediaid)
# 移除季集信息
if mediaid in self._season_episodes:
@@ -202,11 +204,11 @@ class JobManager:
)
return task
def remove_job(self, media: MediaInfo, season: int = None) -> Optional[Dict[str, Any]]:
def remove_job(self, task: TransferTask) -> Optional[TransferJob]:
"""
移除作业
"""
__mediaid__ = self.__get_id(media=media, season=season)
__mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season)
with job_lock:
# 移除作业
if __mediaid__ in self._job_view:
@@ -215,75 +217,98 @@ class JobManager:
self._season_episodes.pop(__mediaid__)
return self._job_view.pop(__mediaid__)
def is_done(self, media: MediaInfo, season: int = None) -> bool:
def is_done(self, task: TransferTask) -> bool:
"""
检查某项作业是否整理完成(不管成功还是失败)
"""
__mediaid__ = self.__get_id(media=media, season=season)
if __mediaid__ not in self._job_view:
return False
with job_lock:
return all(
[task["state"] in ["completed", "failed"] for task in self._job_view[__mediaid__]["tasks"]]
__metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season)
__mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season)
if __metaid__ in self._job_view:
meta_done = all(
task.state in ["completed", "failed"] for task in self._job_view[__metaid__].tasks
)
else:
meta_done = True
if __mediaid__ in self._job_view:
media_done = all(
task.state in ["completed", "failed"] for task in self._job_view[__mediaid__].tasks
)
else:
media_done = False
return meta_done and media_done
def is_finished(self, media: MediaInfo, season: int = None) -> bool:
def is_finished(self, task: TransferTask) -> bool:
"""
检查某项作业是否已完成且有成功的记录
"""
__mediaid__ = self.__get_id(media=media, season=season)
if __mediaid__ not in self._job_view:
return False
with job_lock:
# 所有任务都是完成或者失败状态,且有完成的记录
tasks = self._job_view[__mediaid__]["tasks"]
return True if all(
[task["state"] in ["completed", "failed"] for task in tasks]
__metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season)
__mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season)
if __metaid__ in self._job_view:
meta_finished = all(
task["state"] in ["completed", "failed"] for task in self._job_view[__metaid__].tasks
)
else:
meta_finished = True
if __mediaid__ in self._job_view:
tasks = self._job_view[__mediaid__].tasks
media_finished = all(
task["state"] in ["completed", "failed"] for task in tasks
) and any(
[task["state"] == "completed" for task in tasks]
) else False
task["state"] == "completed" for task in tasks
)
else:
media_finished = False
return meta_finished and media_finished
def is_success(self, media: MediaInfo, season: int = None) -> bool:
def is_success(self, task: TransferTask) -> bool:
"""
检查某项作业是否全部成功
"""
__mediaid__ = self.__get_id(media=media, season=season)
if __mediaid__ not in self._job_view:
return False
with job_lock:
return all(
[task["state"] in ["completed"] for task in self._job_view[__mediaid__]["tasks"]]
__metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season)
__mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season)
if __mediaid__ in self._job_view:
meta_success = all(
task["state"] in ["completed"] for task in self._job_view[__metaid__].tasks
)
else:
meta_success = True
if __mediaid__ in self._job_view:
media_success = all(
task["state"] in ["completed"] for task in self._job_view[__mediaid__].tasks
)
else:
media_success = False
return meta_success and media_success
def get_success_tasks(self, media: MediaInfo, season: int = None) -> List[dict]:
def get_success_tasks(self, media: MediaInfo, season: int = None) -> List[TransferJobTask]:
"""
获取某项任务成功的任务
"""
__mediaid__ = self.__get_id(media=media, season=season)
__mediaid__ = self.__get_media_id(media=media, season=season)
with job_lock:
if __mediaid__ not in self._job_view:
return []
return [task for task in self._job_view[__mediaid__]["tasks"] if task["state"] == "completed"]
return [task for task in self._job_view[__mediaid__].tasks if task.state == "completed"]
def count(self, media: MediaInfo, season: int = None) -> int:
"""
获取某项任务总数
"""
__mediaid__ = self.__get_id(media=media, season=season)
__mediaid__ = self.__get_media_id(media=media, season=season)
with job_lock:
# 计算状态为完成的任务数
if __mediaid__ not in self._job_view:
return 0
return len([task for task in self._job_view[__mediaid__]["tasks"] if task["state"] == "completed"])
return len([task for task in self._job_view[__mediaid__].tasks if task.state == "completed"])
def total(self) -> int:
"""
获取所有task任务总数
"""
with job_lock:
return sum([len(job["tasks"]) for job in self._job_view.values()])
return sum([len(job.tasks) for job in self._job_view.values()])
def list_jobs(self) -> List[dict]:
def list_jobs(self) -> List[TransferJob]:
"""
获取任务列表
"""
@@ -293,7 +318,7 @@ class JobManager:
"""
获取季集清单
"""
__mediaid__ = self.__get_id(media=media, season=season)
__mediaid__ = self.__get_media_id(media=media, season=season)
with job_lock:
return self._season_episodes.get(__mediaid__) or []
@@ -394,7 +419,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
})
# 全部整理成功时
if self.jobview.is_success(task.mediainfo, task.meta.begin_season):
if self.jobview.is_success(task):
# 移动模式删除空目录
if transferinfo.transfer_type in ["move"]:
# 所有成功的业务
@@ -409,7 +434,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
self.storagechain.delete_media_file(t.fileitem, delete_self=False)
# 整理完成且有成功的任务时
if self.jobview.is_finished(task.mediainfo, task.meta.begin_season):
if self.jobview.is_finished(task):
# 发送通知
if transferinfo.need_notify:
se_str = None
@@ -518,8 +543,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
text=__process_msg,
key=ProgressKey.FileTransfer)
# 移除已完成的任务
if self.jobview.is_done(task.mediainfo, task.meta.begin_season):
self.jobview.remove_job(task.mediainfo, task.meta.begin_season)
if self.jobview.is_done(task):
self.jobview.remove_job(task)
except queue.Empty:
if not __queue_start:
# 结束进度
@@ -601,7 +626,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
task.mediainfo = mediainfo
# 更新队列任务
curr_task = self.jobview.remove_task(task.fileitem)
self.jobview.add_task(task, state=curr_task['state'] if curr_task else "waiting")
self.jobview.add_task(task, state=curr_task.state if curr_task else "waiting")
# 查询整理目标目录
if not task.target_directory:

View File

@@ -6,6 +6,7 @@ from pydantic import BaseModel, Field
from app.schemas import TmdbEpisode, DownloadHistory
from app.schemas.file import FileItem
from app.schemas.system import TransferDirectoryConf
from schemas import MediaInfo, MetaInfo
class TransferTorrent(BaseModel):
@@ -74,6 +75,26 @@ class TransferTask(BaseModel):
return dicts
class TransferJobTask(BaseModel):
"""
文件整理作业任务
"""
fileitem: Optional[FileItem] = None
meta: Optional[MetaInfo] = None
state: Optional[str] = None
downloader: Optional[str] = None
download_hash: Optional[str] = None
class TransferJob(BaseModel):
"""
文件整理作业
"""
media: Optional[MediaInfo] = None
season: Optional[int] = None
tasks: Optional[List[TransferJobTask]] = Field(default_factory=list)
class TransferInfo(BaseModel):
"""
文件整理结果