From cce71f23e29aeff38e569beb440d455ca86878dc Mon Sep 17 00:00:00 2001 From: jxxghp Date: Fri, 28 Feb 2025 21:11:51 +0800 Subject: [PATCH] add ScanFileAction --- app/actions/add_download.py | 8 ++-- app/actions/fetch_downloads.py | 2 +- app/actions/fetch_torrents.py | 2 +- app/actions/scan_file.py | 78 ++++++++++++++++++++++++++++++++++ app/actions/transfer_file.py | 65 +++++++++++++++++++--------- 5 files changed, 129 insertions(+), 26 deletions(-) create mode 100644 app/actions/scan_file.py diff --git a/app/actions/add_download.py b/app/actions/add_download.py index c0b6dc1c..072cef02 100644 --- a/app/actions/add_download.py +++ b/app/actions/add_download.py @@ -1,3 +1,5 @@ +from typing import Optional + from pydantic import Field from app.actions import BaseAction @@ -13,9 +15,9 @@ class AddDownloadParams(ActionParams): """ 添加下载资源参数 """ - downloader: str = Field(None, description="下载器") - save_path: str = Field(None, description="保存路径") - only_lack: bool = Field(False, description="仅下载缺失的资源") + downloader: Optional[str] = Field(None, description="下载器") + save_path: Optional[str] = Field(None, description="保存路径") + only_lack: Optional[bool] = Field(False, description="仅下载缺失的资源") class AddDownloadAction(BaseAction): diff --git a/app/actions/fetch_downloads.py b/app/actions/fetch_downloads.py index 0ebdb023..605f91a4 100644 --- a/app/actions/fetch_downloads.py +++ b/app/actions/fetch_downloads.py @@ -30,7 +30,7 @@ class FetchDownloadsAction(BaseAction): @classmethod @property def description(cls) -> str: - return "获取下载任务,更新任务状态" + return "获取下载队列中的任务状态" @classmethod @property diff --git a/app/actions/fetch_torrents.py b/app/actions/fetch_torrents.py index cb76dff5..2666ecb2 100644 --- a/app/actions/fetch_torrents.py +++ b/app/actions/fetch_torrents.py @@ -42,7 +42,7 @@ class FetchTorrentsAction(BaseAction): @classmethod @property def description(cls) -> str: - return "根据关键字搜索站点种子资源" + return "搜索站点种子资源列表" @classmethod @property diff --git a/app/actions/scan_file.py b/app/actions/scan_file.py new file mode 100644 index 00000000..82b7f256 --- /dev/null +++ b/app/actions/scan_file.py @@ -0,0 +1,78 @@ +import copy +from pathlib import Path +from typing import Optional + +from pydantic import Field + +from app.actions import BaseAction +from app.core.config import global_vars, settings +from app.schemas import ActionParams, ActionContext +from app.chain.storage import StorageChain +from app.log import logger + + +class ScanFileParams(ActionParams): + """ + 整理文件参数 + """ + # 存储 + storage: Optional[str] = Field("local", description="存储") + directory: Optional[str] = Field(None, description="目录") + + +class ScanFileAction(BaseAction): + """ + 整理文件 + """ + + _fileitems = [] + _has_error = False + + def __init__(self): + super().__init__() + self.storagechain = StorageChain() + + @classmethod + @property + def name(cls) -> str: + return "扫描目录" + + @classmethod + @property + def description(cls) -> str: + return "扫描目录文件到队列" + + @classmethod + @property + def data(cls) -> dict: + return ScanFileParams().dict() + + @property + def success(self) -> bool: + return not self._has_error + + def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext: + """ + 扫描目录中的所有文件,记录到fileitems + """ + params = ScanFileParams(**params) + if not params.storage or not params.directory: + return context + fileitem = self.storagechain.get_file_item(params.storage, Path(params.directory)) + if not fileitem: + logger.error(f"目录不存在: 【{params.storage}】{params.directory}") + self._has_error = True + return context + files = self.storagechain.list_files(fileitem, recursion=True) + for file in files: + if global_vars.is_workflow_stopped(workflow_id): + break + if not file.extension or f".{file.extension.lower()}" not in settings.RMT_MEDIAEXT: + continue + self._fileitems.append(fileitem) + + if self._fileitems: + context.fileitems.extend(self._fileitems) + + self.job_done() + return context diff --git a/app/actions/transfer_file.py b/app/actions/transfer_file.py index 07512e75..e45f6586 100644 --- a/app/actions/transfer_file.py +++ b/app/actions/transfer_file.py @@ -1,4 +1,8 @@ +import copy from pathlib import Path +from typing import Optional + +from pydantic import Field from app.actions import BaseAction from app.core.config import global_vars @@ -12,7 +16,8 @@ class TransferFileParams(ActionParams): """ 整理文件参数 """ - pass + # 来源 + source: Optional[str] = Field("downloads", description="来源") class TransferFileAction(BaseAction): @@ -36,7 +41,7 @@ class TransferFileAction(BaseAction): @classmethod @property def description(cls) -> str: - return "整理下载队列中的文件" + return "整理队列中的文件" @classmethod @property @@ -49,26 +54,44 @@ class TransferFileAction(BaseAction): def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext: """ - 从downloads中整理文件,记录到fileitems + 从 downloads / fileitems 中整理文件,记录到fileitems """ - for download in context.downloads: - if global_vars.is_workflow_stopped(workflow_id): - break - if not download.completed: - logger.info(f"下载任务 {download.download_id} 未完成") - continue - fileitem = self.storagechain.get_file_item(storage="local", path=Path(download.path)) - if not fileitem: - logger.info(f"文件 {download.path} 不存在") - continue - logger.info(f"开始整理文件 {download.path} ...") - state, errmsg = self.transferchain.do_transfer(fileitem, background=False) - if not state: - self._has_error = True - logger.error(f"整理文件 {download.path} 失败: {errmsg}") - continue - logger.info(f"整理文件 {download.path} 完成") - self._fileitems.append(fileitem) + params = TransferFileParams(**params) + if params.source == "downloads": + # 从下载任务中整理文件 + for download in context.downloads: + if global_vars.is_workflow_stopped(workflow_id): + break + if not download.completed: + logger.info(f"下载任务 {download.download_id} 未完成") + continue + fileitem = self.storagechain.get_file_item(storage="local", path=Path(download.path)) + if not fileitem: + logger.info(f"文件 {download.path} 不存在") + continue + logger.info(f"开始整理文件 {download.path} ...") + state, errmsg = self.transferchain.do_transfer(fileitem, background=False) + if not state: + self._has_error = True + logger.error(f"整理文件 {download.path} 失败: {errmsg}") + continue + logger.info(f"整理文件 {download.path} 完成") + self._fileitems.append(fileitem) + else: + # 从 fileitems 中整理文件 + for fileitem in copy.deepcopy(context.fileitems): + if global_vars.is_workflow_stopped(workflow_id): + break + logger.info(f"开始整理文件 {fileitem.path} ...") + state, errmsg = self.transferchain.do_transfer(fileitem, background=False) + if not state: + self._has_error = True + logger.error(f"整理文件 {fileitem.path} 失败: {errmsg}") + continue + logger.info(f"整理文件 {fileitem.path} 完成") + # 从 fileitems 中移除已整理的文件 + context.fileitems.remove(fileitem) + self._fileitems.append(fileitem) if self._fileitems: context.fileitems.extend(self._fileitems)