diff --git a/app/chain/transfer.py b/app/chain/transfer.py index 75fbf61a..230e4b55 100644 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -30,7 +30,164 @@ from app.utils.singleton import Singleton from app.utils.string import StringUtils downloader_lock = threading.Lock() -tree_lock = threading.Lock() +job_lock = threading.Lock() + + +class JobManager: + """ + 作业管理器 + """ + + # 整理中的作业 + """ + { + meidaid: [{ + "task": TransferTask, + "state": "running" | "waiting" | "failed" | "completed", + }] + } + """ + _job_view: Dict[Tuple, List[Dict[str, Any]]] = {} + # 汇总季集清单 + _season_episodes: Dict[Tuple, List[int]] = {} + + def __init__(self): + self._job_view = {} + self._season_episodes = {} + + @staticmethod + def __get_id(task: TransferTask = None, media: MediaInfo = None, season: int = None) -> Tuple: + """ + 获取作业ID + """ + if task: + return task.mediainfo.tmdb_id or task.mediainfo.douban_id, task.meta.begin_season + else: + return media.tmdb_id or media.douban_id, season + + def add_task(self, task: TransferTask): + """ + 添加整理任务 + """ + if not task or not task.mediainfo or not task.meta: + return + with job_lock: + __mediaid__ = self.__get_id(task) + if __mediaid__ not in self._job_view: + self._job_view[__mediaid__] = [] + # 添加任务 + self._job_view[__mediaid__].append({ + "task": task, + "state": "waiting" + }) + # 添加季集信息 + if self._season_episodes.get(__mediaid__): + self._season_episodes[__mediaid__].extend(task.meta.episode_list) + self._season_episodes[__mediaid__] = list(set(self._season_episodes[__mediaid__])) + else: + self._season_episodes[__mediaid__] = task.meta.episode_list + + def running_task(self, task: TransferTask): + """ + 任务运行中 + """ + if not task or not task.mediainfo or not task.meta: + return + with job_lock: + __mediaid__ = self.__get_id(task) + if __mediaid__ not in self._job_view: + return + # 更新状态 + for job in self._job_view[__mediaid__]: + if job["task"] == task: + job["state"] = "running" + break + + def finish_task(self, task: TransferTask): + """ + 任务完成 + """ + if not task or not task.mediainfo or not task.meta: + return + with job_lock: + __mediaid__ = self.__get_id(task) + if __mediaid__ not in self._job_view: + return + # 更新状态 + for job in self._job_view[__mediaid__]: + if job["task"] == task: + job["state"] = "completed" + break + + def fail_task(self, task: TransferTask): + """ + 任务失败 + """ + if not task or not task.mediainfo or not task.meta: + return + with job_lock: + __mediaid__ = self.__get_id(task) + if __mediaid__ not in self._job_view: + return + # 更新状态 + for job in self._job_view[__mediaid__]: + if job["task"] == task: + job["state"] = "failed" + break + # 移除剧集信息 + if __mediaid__ in self._season_episodes: + self._season_episodes[__mediaid__] = list( + set(self._season_episodes[__mediaid__]) - set(task.meta.episode_list)) + + def remove_job(self, media: MediaInfo, season: int = None) -> Optional[List[Dict[str, Any]]]: + """ + 移除作业 + """ + if not media: + return + __mediaid__ = self.__get_id(media=media, season=season) + with job_lock: + # 移除作业 + if __mediaid__ in self._job_view: + return self._job_view.pop(__mediaid__) + # 移除季集信息 + if __mediaid__ in self._season_episodes: + self._season_episodes.pop(__mediaid__) + + def is_finished(self, media: MediaInfo, season: int = None): + """ + 任务检查某项任务是否全部为已完成状态 + """ + if not media: + return False + __mediaid__ = self.__get_id(media=media, season=season) + if __mediaid__ not in self._job_view: + return True + with job_lock: + return all([job["state"] in ["completed", "failed"] for job in self._job_view[__mediaid__]]) + + def total(self): + """ + 获取所有task任务总数 + """ + with job_lock: + return sum([len(jobs) for jobs in self._job_view.values()]) + + def list_jobs(self) -> Dict: + """ + 获取任务列表 + """ + return self._job_view + + def season_episodes(self, media: MediaInfo, season: int = None) -> List[int]: + """ + 获取季集清单 + """ + if not media: + return [] + __mediaid__ = self.__get_id(media=media, season=season) + with job_lock: + return self._season_episodes.get(__mediaid__) or [] class TransferChain(ChainBase, metaclass=Singleton): @@ -44,9 +201,6 @@ class TransferChain(ChainBase, metaclass=Singleton): # 待整理任务队列 _queue = Queue() - # 整理中的目录树 - _job_tree: Dict[str, Dict[str, Any]] = {} - # 文件整理线程 _transfer_thread = None @@ -63,6 +217,7 @@ class TransferChain(ChainBase, metaclass=Singleton): self.storagechain = StorageChain() self.systemconfig = SystemConfigOper() self.directoryhelper = DirectoryHelper() + self.jobview = JobManager() # 启动整理任务 self.__init() @@ -104,6 +259,11 @@ class TransferChain(ChainBase, metaclass=Singleton): image=task.mediainfo.get_message_image(), link=settings.MP_DOMAIN('#/history') )) + # 任务失败 + self.jobview.fail_task(task) + # 移除已完成的任务 + if self.jobview.is_finished(task.mediainfo, task.meta.begin_season): + self.jobview.remove_job(task.mediainfo, task.meta.begin_season) return # 转移成功 @@ -130,8 +290,8 @@ class TransferChain(ChainBase, metaclass=Singleton): 'download_hash': task.download_hash, }) - # TODO 整理完成一个媒体项时 - if True: + # 整理完成一个媒体项时 + if self.jobview.is_finished(task.mediainfo, task.meta.begin_season): # 移动模式删除空目录 if transferinfo.transfer_type in ["move"]: # 下载器hash @@ -142,24 +302,24 @@ class TransferChain(ChainBase, metaclass=Singleton): if task.fileitem: self.storagechain.delete_media_file(task.fileitem, delete_self=False) - # TODO 发送通知 - """ + # 发送通知 if transferinfo.need_notify: se_str = None - if task.media.type == MediaType.TV: - se_str = f"{transfer_meta.season} {StringUtils.format_ep(season_episodes[mkey])}" - self.send_transfer_message(meta=transfer_meta, - mediainfo=media, - transferinfo=transfer_info, + if task.mediainfo.type == MediaType.TV: + se_str = f"{task.meta.season} {StringUtils.format_ep(self.jobview.season_episodes(task.mediainfo, task.meta.begin_season))}" + self.send_transfer_message(meta=task.meta, + mediainfo=task.mediainfo, + transferinfo=transferinfo, season_episode=se_str) - # TODO 刮削事件 - if scrape or transfer_info.need_scrape: + # 刮削事件 + if transferinfo.need_scrape: self.eventmanager.send_event(EventType.MetadataScrape, { - 'meta': transfer_meta, - 'mediainfo': media, - 'fileitem': transfer_info.target_diritem + 'meta': task.meta, + 'mediainfo': task.mediainfo, + 'fileitem': transferinfo.target_diritem }) - """ + # 清除作业 + self.jobview.remove_job(task.mediainfo, task.meta.begin_season) def put_to_queue(self, task: TransferTask, callback: Optional[Callable] = None): """ @@ -169,8 +329,8 @@ class TransferChain(ChainBase, metaclass=Singleton): """ if not task: return - # TODO 维护目录树 - + # 维护整理任务视图 + self.jobview.add_task(task) # 添加到队列 self._queue.put(TransferQueue( task=task, @@ -181,34 +341,32 @@ class TransferChain(ChainBase, metaclass=Singleton): """ 处理队列 """ + # 队列开始标识 + __queue_start = True # 任务总数 total_num = 0 # 已处理总数 processed_num = 0 # 失败数量 fail_num = 0 - # 跳过数量 - skip_num = 0 - # 队列开始标识 - __queue_start = True while not global_vars.is_system_stopped: try: item: TransferQueue = self._queue.get(timeout=self._transfer_interval) - if item: + if item and item.task: + self.jobview.running_task(item.task) if __queue_start: logger.info("开始整理队列处理...") # 启动进度 self.progress.start(ProgressKey.FileTransfer) - # TODO 计算总数 - total_num = 0 + total_num = self.jobview.total() self.progress.update(value=0, text=f"开始整理队列处理,共 {total_num} 个文件或子目录 ...", key=ProgressKey.FileTransfer) # 队列已开始 __queue_start = False - # TODO 重新计算总数 - total_num = 0 + # 重新计算总数 + total_num = self.jobview.total() # 更新进度 __process_msg = f"正在整理 ({processed_num + 1}/{total_num}){item.task.fileitem.name} ..." logger.info(__process_msg) @@ -216,11 +374,14 @@ class TransferChain(ChainBase, metaclass=Singleton): text=__process_msg, key=ProgressKey.FileTransfer) # 整理 - self.__handle_transfer(task=item.task, callback=item.callback) + if self.__handle_transfer(task=item.task, callback=item.callback): + processed_num += 1 + else: + fail_num += 1 except queue.Empty: if not __queue_start: # 结束进度 - __end_msg = f"整理队列处理完成,共整理 {total_num} 个文件,失败 {fail_num} 个,跳过 {skip_num} 个" + __end_msg = f"整理队列处理完成,共整理 {total_num} 个文件,失败 {fail_num} 个" logger.info(__end_msg) self.progress.update(value=100, text=__end_msg, @@ -230,19 +391,18 @@ class TransferChain(ChainBase, metaclass=Singleton): total_num = 0 processed_num = 0 fail_num = 0 - skip_num = 0 # 标记为新队列 __queue_start = True continue except Exception as e: logger.error(f"整理队列处理出现错误:{e}") - def __handle_transfer(self, task: TransferTask, callback: Optional[Callable] = None): + def __handle_transfer(self, task: TransferTask, callback: Optional[Callable] = None) -> bool: """ 处理整理任务 """ if not task: - return + return False # 执行整理 transferinfo: TransferInfo = self.transfer(fileitem=task.fileitem, meta=task.meta, @@ -257,10 +417,11 @@ class TransferChain(ChainBase, metaclass=Singleton): library_category_folder=task.library_category_folder) if not transferinfo: logger.error("文件整理模块运行失败") - return + return False # 回调,位置传参:任务、整理结果 if callback: - callback(task, transferinfo) + return callback(task, transferinfo) + return False def recommend_name(self, meta: MetaBase, mediainfo: MediaInfo) -> Optional[str]: """ @@ -363,6 +524,7 @@ class TransferChain(ChainBase, metaclass=Singleton): def __get_trans_fileitems(self, fileitem: FileItem) -> List[Tuple[FileItem, bool]]: """ 获取整理目录或文件列表 + :param fileitem: 文件项 """ def __is_bluray_dir(_fileitem: FileItem) -> bool: @@ -397,16 +559,18 @@ class TransferChain(ChainBase, metaclass=Singleton): # 蓝光原盘子目录或文件 if __is_bluray_sub(fileitem.path): - return [(__get_bluray_dir(Path(fileitem.path)), True)] - - # 蓝光原盘根目录 - if __is_bluray_dir(fileitem): - return [(fileitem, True)] + dir_item = __get_bluray_dir(fileitem.storage, Path(fileitem.path)) + if dir_item: + return [(dir_item, True)] # 单文件 if fileitem.type == "file": return [(fileitem, False)] + # 蓝光原盘根目录 + if __is_bluray_dir(fileitem): + return [(fileitem, True)] + # 需要整理的文件项列表 trans_items = [] # 先检查当前目录的下级目录,以支持合集的情况 @@ -461,6 +625,18 @@ class TransferChain(ChainBase, metaclass=Singleton): 返回:成功标识,错误信息 """ + def __is_allow_extensions(_ext: str) -> bool: + """ + 判断是否允许的扩展名 + """ + return True if not self.all_exts or f".{_ext.lower()}" in self.all_exts else False + + def __is_allow_filesize(_size: int, _min_filesize: int) -> bool: + """ + 判断是否满足最小文件大小 + """ + return True if not _min_filesize or _size > _min_filesize * 1024 * 1024 else False + # 自定义格式 formaterHandler = FormatParser(eformat=epformat.format, details=epformat.detail, @@ -469,19 +645,18 @@ class TransferChain(ChainBase, metaclass=Singleton): # 整理屏蔽词 transfer_exclude_words = self.systemconfig.get(SystemConfigKey.TransferExcludeWords) - # 待整理文件列表 - file_items: List[Tuple[FileItem, bool]] = [] # 汇总错误信息 err_msgs: List[str] = [] - - # 获取待整理路径清单 + # 待整理目录或文件项 trans_items = self.__get_trans_fileitems(fileitem) + # 待整理的文件列表 + file_items: List[Tuple[FileItem, bool]] = [] if not trans_items: logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件") return False, f"{fileitem.name} 没有找到可整理的媒体文件" - # 处理所有待整理目录或文件 + # 转换为所有待处理的文件清单 for trans_item, bluray_dir in trans_items: # 如果是目录且不是⼀蓝光原盘,获取所有文件并整理 if trans_item.type == "dir" and not bluray_dir: @@ -491,21 +666,18 @@ class TransferChain(ChainBase, metaclass=Singleton): else: file_items.append((trans_item, bluray_dir)) + # 有集自定义格式,过滤文件 if formaterHandler: - # 有集自定义格式,过滤文件 file_items = [f for f in file_items if formaterHandler.match(f[0].name)] # 过滤后缀和大小 file_items = [f for f in file_items - if f[0].extension and (f".{f[0].extension.lower()}" in self.all_exts - and (not min_filesize or f[0].size > min_filesize * 1024 * 1024))] + if __is_allow_extensions(f[0].extension) and __is_allow_filesize(f[0].size, min_filesize)] if not file_items: logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件") return False, f"{fileitem.name} 没有找到可整理的媒体文件" - # 更新总文件数 - total_num = len(file_items) - logger.info(f"正在整理 {total_num} 个文件...") + logger.info(f"正在计划整理 {len(file_items)} 个文件...") # 整理所有文件 for file_item, bluray_dir in file_items: @@ -589,15 +761,16 @@ class TransferChain(ChainBase, metaclass=Singleton): file_mediainfo: MediaInfo = self.recognize_media(mtype=MediaType(download_history.type), tmdbid=download_history.tmdbid, doubanid=download_history.doubanid) - if mediainfo: + if file_mediainfo: # 更新自定义媒体类别 if download_history.media_category: - mediainfo.category = download_history.media_category + file_mediainfo.category = download_history.media_category else: # 识别媒体信息 file_mediainfo = self.mediachain.recognize_by_meta(file_meta) # 更新媒体图片 - self.obtain_images(mediainfo=file_mediainfo) + if file_mediainfo: + self.obtain_images(mediainfo=file_mediainfo) else: file_mediainfo = mediainfo @@ -661,21 +834,24 @@ class TransferChain(ChainBase, metaclass=Singleton): target_storage=target_storage) # 后台整理 - self.__handle_transfer(task=TransferTask( - fileitem=file_item, - meta=file_meta, - mediainfo=file_mediainfo, - target_directory=target_directory or dir_info, - target_storage=target_storage, - target_path=target_path, - transfer_type=transfer_type, - episodes_info=episodes_info, - scrape=scrape, - library_type_folder=library_type_folder, - library_category_folder=library_category_folder, - downloader=downloader, - download_hash=download_hash - )) + self.put_to_queue( + task=TransferTask( + fileitem=file_item, + meta=file_meta, + mediainfo=file_mediainfo, + target_directory=target_directory or dir_info, + target_storage=target_storage, + target_path=target_path, + transfer_type=transfer_type, + episodes_info=episodes_info, + scrape=scrape, + library_type_folder=library_type_folder, + library_category_folder=library_category_folder, + downloader=downloader, + download_hash=download_hash + ) + ) + logger.info(f"{file_path.name} 已添加到整理队列") return True, "\n".join(err_msgs) diff --git a/app/monitor.py b/app/monitor.py index 8c34c9bf..300c3228 100644 --- a/app/monitor.py +++ b/app/monitor.py @@ -21,7 +21,7 @@ from app.db.transferhistory_oper import TransferHistoryOper from app.helper.directory import DirectoryHelper from app.helper.message import MessageHelper from app.log import logger -from app.schemas import FileItem, TransferTask +from app.schemas import FileItem from app.utils.singleton import Singleton lock = Lock() @@ -43,10 +43,12 @@ class FileMonitorHandler(FileSystemEventHandler): self.callback = callback def on_created(self, event: FileSystemEvent): - self.callback.event_handler(event=event, text="创建", event_path=Path(event.src_path)) + self.callback.event_handler(event=event, text="创建", event_path=event.src_path, + size=Path(event.src_path).stat().st_size) def on_moved(self, event: FileSystemMovedEvent): - self.callback.event_handler(event=event, text="移动", event_path=Path(event.dest_path)) + self.callback.event_handler(event=event, text="移动", event_path=event.dest_path, + size=Path(event.dest_path).stat().st_size) class Monitor(metaclass=Singleton): @@ -194,41 +196,36 @@ class Monitor(metaclass=Singleton): new_files = new_snapshot.keys() - old_snapshot.keys() for new_file in new_files: # 添加到待整理队列 - self.__handle_file(storage=storage, event_path=Path(new_file)) + self.__handle_file(storage=storage, event_path=Path(new_file), + file_size=new_snapshot.get(new_file)) # 更新快照 self._storage_snapshot[storage] = new_snapshot - def event_handler(self, event, text: str, event_path: Path): + def event_handler(self, event, text: str, event_path: str, file_size: float = None): """ 处理文件变化 :param event: 事件 :param text: 事件描述 :param event_path: 事件文件路径 + :param file_size: 文件大小 """ if not event.is_directory: # 文件发生变化 logger.debug(f"文件 {event_path} 发生了 {text}") # 整理文件 - self.__handle_file(storage="local", event_path=event_path) + self.__handle_file(storage="local", event_path=Path(event_path), file_size=file_size) - def __transfer_queue(self, task: TransferTask): - """ - 添加到整理队列 - """ - # 加入整理队列,使用默认的回调函数 - self.transferchain.put_to_queue(task=task) - - def __handle_file(self, storage: str, event_path: Path): + def __handle_file(self, storage: str, event_path: Path, file_size: float = None): """ 整理一个文件 :param storage: 存储 :param event_path: 事件文件路径 + :param file_size: 文件大小 """ # 全程加锁 with lock: try: # 开始整理 - # TODO 缺少文件大小 self.transferchain.do_transfer( fileitem=FileItem( storage=storage, @@ -236,7 +233,8 @@ class Monitor(metaclass=Singleton): type="file", name=event_path.name, basename=event_path.stem, - extension=event_path.suffix[1:] + extension=event_path.suffix[1:], + size=file_size ), src_match=True )