Files
MoviePilot/app/workflow/actions/scan_file.py
2026-01-21 17:59:18 +08:00

79 lines
2.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from pathlib import Path
from typing import Optional
from pydantic import Field
from app.workflow.actions import BaseAction
from app.chain.storage import StorageChain
from app.core.config import global_vars, settings
from app.log import logger
from app.schemas import ActionParams, ActionContext
class ScanFileParams(ActionParams):
"""
整理文件参数
"""
# 存储
storage: Optional[str] = Field(default="local", description="存储")
directory: Optional[str] = Field(default=None, description="目录")
class ScanFileAction(BaseAction):
"""
整理文件
"""
def __init__(self, action_id: str):
super().__init__(action_id)
self._fileitems = []
self._has_error = False
@classmethod
@property
def name(cls) -> str: # noqa
return "扫描目录"
@classmethod
@property
def description(cls) -> str: # noqa
return "扫描目录文件到队列"
@classmethod
@property
def data(cls) -> dict: # noqa
return ScanFileParams().model_dump()
@property
def success(self) -> bool:
return not self._has_error
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
扫描目录中的所有文件记录到fileitems
"""
params = ScanFileParams(**params)
if not params.storage or not params.directory:
return context
storagechain = StorageChain()
fileitem = storagechain.get_file_item(params.storage, Path(params.directory))
if not fileitem:
logger.error(f"目录不存在: 【{params.storage}{params.directory}")
self._has_error = True
return context
files = storagechain.list_files(fileitem, recursion=True)
for file in files:
if global_vars.is_workflow_stopped(workflow_id):
break
media_exts = settings.RMT_MEDIAEXT + settings.RMT_SUBEXT + settings.RMT_AUDIOEXT
if not file.extension or f".{file.extension.lower()}" not in media_exts:
continue
# 添加文件到队列,而不是目录
self._fileitems.append(file)
if self._fileitems:
context.fileitems.extend(self._fileitems)
self.job_done(f"扫描到 {len(self._fileitems)} 个文件")
return context