add ScanFileAction

This commit is contained in:
jxxghp
2025-02-28 21:11:51 +08:00
parent d68461a127
commit cce71f23e2
5 changed files with 129 additions and 26 deletions

View File

@@ -1,3 +1,5 @@
from typing import Optional
from pydantic import Field
from app.actions import BaseAction
@@ -13,9 +15,9 @@ class AddDownloadParams(ActionParams):
"""
添加下载资源参数
"""
downloader: str = Field(None, description="下载器")
save_path: str = Field(None, description="保存路径")
only_lack: bool = Field(False, description="仅下载缺失的资源")
downloader: Optional[str] = Field(None, description="下载器")
save_path: Optional[str] = Field(None, description="保存路径")
only_lack: Optional[bool] = Field(False, description="仅下载缺失的资源")
class AddDownloadAction(BaseAction):

View File

@@ -30,7 +30,7 @@ class FetchDownloadsAction(BaseAction):
@classmethod
@property
def description(cls) -> str:
return "获取下载任务,更新任务状态"
return "获取下载队列中的任务状态"
@classmethod
@property

View File

@@ -42,7 +42,7 @@ class FetchTorrentsAction(BaseAction):
@classmethod
@property
def description(cls) -> str:
return "根据关键字搜索站点种子资源"
return "搜索站点种子资源列表"
@classmethod
@property

78
app/actions/scan_file.py Normal file
View File

@@ -0,0 +1,78 @@
import copy
from pathlib import Path
from typing import Optional
from pydantic import Field
from app.actions import BaseAction
from app.core.config import global_vars, settings
from app.schemas import ActionParams, ActionContext
from app.chain.storage import StorageChain
from app.log import logger
class ScanFileParams(ActionParams):
"""
整理文件参数
"""
# 存储
storage: Optional[str] = Field("local", description="存储")
directory: Optional[str] = Field(None, description="目录")
class ScanFileAction(BaseAction):
"""
整理文件
"""
_fileitems = []
_has_error = False
def __init__(self):
super().__init__()
self.storagechain = StorageChain()
@classmethod
@property
def name(cls) -> str:
return "扫描目录"
@classmethod
@property
def description(cls) -> str:
return "扫描目录文件到队列"
@classmethod
@property
def data(cls) -> dict:
return ScanFileParams().dict()
@property
def success(self) -> bool:
return not self._has_error
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
扫描目录中的所有文件记录到fileitems
"""
params = ScanFileParams(**params)
if not params.storage or not params.directory:
return context
fileitem = self.storagechain.get_file_item(params.storage, Path(params.directory))
if not fileitem:
logger.error(f"目录不存在: 【{params.storage}{params.directory}")
self._has_error = True
return context
files = self.storagechain.list_files(fileitem, recursion=True)
for file in files:
if global_vars.is_workflow_stopped(workflow_id):
break
if not file.extension or f".{file.extension.lower()}" not in settings.RMT_MEDIAEXT:
continue
self._fileitems.append(fileitem)
if self._fileitems:
context.fileitems.extend(self._fileitems)
self.job_done()
return context

View File

@@ -1,4 +1,8 @@
import copy
from pathlib import Path
from typing import Optional
from pydantic import Field
from app.actions import BaseAction
from app.core.config import global_vars
@@ -12,7 +16,8 @@ class TransferFileParams(ActionParams):
"""
整理文件参数
"""
pass
# 来源
source: Optional[str] = Field("downloads", description="来源")
class TransferFileAction(BaseAction):
@@ -36,7 +41,7 @@ class TransferFileAction(BaseAction):
@classmethod
@property
def description(cls) -> str:
return "整理下载队列中的文件"
return "整理队列中的文件"
@classmethod
@property
@@ -49,26 +54,44 @@ class TransferFileAction(BaseAction):
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
从downloads中整理文件记录到fileitems
downloads / fileitems 中整理文件记录到fileitems
"""
for download in context.downloads:
if global_vars.is_workflow_stopped(workflow_id):
break
if not download.completed:
logger.info(f"下载任务 {download.download_id} 未完成")
continue
fileitem = self.storagechain.get_file_item(storage="local", path=Path(download.path))
if not fileitem:
logger.info(f"文件 {download.path} 不存在")
continue
logger.info(f"开始整理文件 {download.path} ...")
state, errmsg = self.transferchain.do_transfer(fileitem, background=False)
if not state:
self._has_error = True
logger.error(f"整理文件 {download.path} 失败: {errmsg}")
continue
logger.info(f"整理文件 {download.path} 完成")
self._fileitems.append(fileitem)
params = TransferFileParams(**params)
if params.source == "downloads":
# 从下载任务中整理文件
for download in context.downloads:
if global_vars.is_workflow_stopped(workflow_id):
break
if not download.completed:
logger.info(f"下载任务 {download.download_id} 未完成")
continue
fileitem = self.storagechain.get_file_item(storage="local", path=Path(download.path))
if not fileitem:
logger.info(f"文件 {download.path} 不存在")
continue
logger.info(f"开始整理文件 {download.path} ...")
state, errmsg = self.transferchain.do_transfer(fileitem, background=False)
if not state:
self._has_error = True
logger.error(f"整理文件 {download.path} 失败: {errmsg}")
continue
logger.info(f"整理文件 {download.path} 完成")
self._fileitems.append(fileitem)
else:
# 从 fileitems 中整理文件
for fileitem in copy.deepcopy(context.fileitems):
if global_vars.is_workflow_stopped(workflow_id):
break
logger.info(f"开始整理文件 {fileitem.path} ...")
state, errmsg = self.transferchain.do_transfer(fileitem, background=False)
if not state:
self._has_error = True
logger.error(f"整理文件 {fileitem.path} 失败: {errmsg}")
continue
logger.info(f"整理文件 {fileitem.path} 完成")
# 从 fileitems 中移除已整理的文件
context.fileitems.remove(fileitem)
self._fileitems.append(fileitem)
if self._fileitems:
context.fileitems.extend(self._fileitems)