diff --git a/app/chain/transfer.py b/app/chain/transfer.py index c95164ca..6ce7bab9 100644 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -230,13 +230,14 @@ class TransferChain(ChainBase, metaclass=Singleton): self._transfer_thread = threading.Thread(target=self.__start_transfer, daemon=True) self._transfer_thread.start() - def __default_callback(self, task: TransferTask, transferinfo: TransferInfo, /): + def __default_callback(self, task: TransferTask, + transferinfo: TransferInfo, /) -> Tuple[bool, str]: """ 整理完成后处理 """ if not transferinfo: logger.error("文件转移模块运行失败") - return + return False, "文件转移模块运行失败" if not transferinfo.success: # 转移失败 @@ -264,7 +265,7 @@ class TransferChain(ChainBase, metaclass=Singleton): # 移除已完成的任务 if self.jobview.is_finished(task.mediainfo, task.meta.begin_season): self.jobview.remove_job(task.mediainfo, task.meta.begin_season) - return + return False, transferinfo.message # 转移成功 logger.info(f"{task.fileitem.name} 入库成功:{transferinfo.target_diritem.path}") @@ -321,6 +322,8 @@ class TransferChain(ChainBase, metaclass=Singleton): # 清除作业 self.jobview.remove_job(task.mediainfo, task.meta.begin_season) + return True, "" + def put_to_queue(self, task: TransferTask, callback: Optional[Callable] = None): """ 添加到待整理队列 @@ -376,9 +379,11 @@ class TransferChain(ChainBase, metaclass=Singleton): text=__process_msg, key=ProgressKey.FileTransfer) # 整理 - if self.__handle_transfer(task=item.task, callback=item.callback): + state, err_msg = self.__handle_transfer(task=item.task, callback=item.callback) + if state: processed_num += 1 else: + logger.warn(f"{item.task.fileitem.name} 整理失败:{err_msg}") fail_num += 1 except queue.Empty: if not __queue_start: @@ -399,12 +404,13 @@ class TransferChain(ChainBase, metaclass=Singleton): except Exception as e: logger.error(f"整理队列处理出现错误:{e}") - def __handle_transfer(self, task: TransferTask, callback: Optional[Callable] = None) -> bool: + def __handle_transfer(self, task: TransferTask, + callback: Optional[Callable] = None) -> Tuple[bool, str]: """ 处理整理任务 """ if not task: - return False + return False, "" # 执行整理 transferinfo: TransferInfo = self.transfer(fileitem=task.fileitem, meta=task.meta, @@ -419,11 +425,13 @@ class TransferChain(ChainBase, metaclass=Singleton): library_category_folder=task.library_category_folder) if not transferinfo: logger.error("文件整理模块运行失败") - return False + return False, "文件整理模块运行失败" + # 回调,位置传参:任务、整理结果 if callback: return callback(task, transferinfo) - return False + + return transferinfo.success, transferinfo.message def recommend_name(self, meta: MetaBase, mediainfo: MediaInfo) -> Optional[str]: """ @@ -604,7 +612,8 @@ class TransferChain(ChainBase, metaclass=Singleton): library_type_folder: bool = None, library_category_folder: bool = None, season: int = None, epformat: EpisodeFormat = None, min_filesize: int = 0, downloader: str = None, download_hash: str = None, - force: bool = False, src_match: bool = False) -> Tuple[bool, str]: + force: bool = False, src_match: bool = False, + background: bool = True) -> Tuple[bool, str]: """ 执行一个复杂目录的整理操作 :param fileitem: 文件项 @@ -624,6 +633,7 @@ class TransferChain(ChainBase, metaclass=Singleton): :param download_hash: 下载记录hash :param force: 是否强制整理 :param src_match: 是否源目录匹配 + :param background: 是否后台运行 返回:成功标识,错误信息 """ @@ -639,6 +649,9 @@ class TransferChain(ChainBase, metaclass=Singleton): """ return True if not _min_filesize or _size > _min_filesize * 1024 * 1024 else False + # 是否全部成功 + all_success = True + # 自定义格式 formaterHandler = FormatParser(eformat=epformat.format, details=epformat.detail, @@ -712,6 +725,7 @@ class TransferChain(ChainBase, metaclass=Singleton): if not force: transferd = self.transferhis.get_by_src(file_item.path, storage=file_item.storage) if transferd and transferd.status: + all_success = False logger.info(f"{file_item.path} 已成功整理过,如需重新处理,请删除历史记录。") continue @@ -726,6 +740,7 @@ class TransferChain(ChainBase, metaclass=Singleton): file_meta.begin_season = season if not file_meta: + all_success = False logger.error(f"{file_path} 无法识别有效信息") err_msgs.append(f"{file_path} 无法识别有效信息") continue @@ -777,6 +792,7 @@ class TransferChain(ChainBase, metaclass=Singleton): file_mediainfo = mediainfo if not file_mediainfo: + all_success = False logger.warn(f'{file_path} 未识别到媒体信息') # 新增整理失败历史记录 his = self.transferhis.add_fail( @@ -836,26 +852,37 @@ class TransferChain(ChainBase, metaclass=Singleton): target_storage=target_storage) # 后台整理 - self.put_to_queue( - task=TransferTask( - fileitem=file_item, - meta=file_meta, - mediainfo=file_mediainfo, - target_directory=target_directory or dir_info, - target_storage=target_storage, - target_path=target_path, - transfer_type=transfer_type, - episodes_info=episodes_info, - scrape=scrape, - library_type_folder=library_type_folder, - library_category_folder=library_category_folder, - downloader=downloader, - download_hash=download_hash + transfer_task = TransferTask( + fileitem=file_item, + meta=file_meta, + mediainfo=file_mediainfo, + target_directory=target_directory or dir_info, + target_storage=target_storage, + target_path=target_path, + transfer_type=transfer_type, + episodes_info=episodes_info, + scrape=scrape, + library_type_folder=library_type_folder, + library_category_folder=library_category_folder, + downloader=downloader, + download_hash=download_hash + ) + if background: + self.put_to_queue( + task=transfer_task ) - ) - logger.info(f"{file_path.name} 已添加到整理队列") + logger.info(f"{file_path.name} 已添加到整理队列") + else: + state, err_msg = self.__handle_transfer( + task=transfer_task, + callback=self.__default_callback + ) + if not state: + all_success = False + logger.warn(f"{file_path.name} 整理失败:{err_msg}") + err_msgs.append(err_msg) - return True, "\n".join(err_msgs) if err_msgs else f"{fileitem.name} 已添加到整理任务队列" + return all_success, "\n".join(err_msgs) def remote_transfer(self, arg_str: str, channel: MessageChannel, userid: Union[str, int] = None, source: str = None):