mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-03-20 20:17:22 +08:00
64 lines
1.1 KiB
Python
64 lines
1.1 KiB
Python
from abc import ABC, abstractmethod
|
|
|
|
from app.chain import ChainBase
|
|
from app.schemas import ActionContext, ActionParams
|
|
|
|
|
|
class ActionChain(ChainBase):
|
|
pass
|
|
|
|
|
|
class BaseAction(ABC):
|
|
"""
|
|
工作流动作基类
|
|
"""
|
|
|
|
# 完成标志
|
|
_done_flag = False
|
|
|
|
@classmethod
|
|
@property
|
|
@abstractmethod
|
|
def name(cls) -> str:
|
|
pass
|
|
|
|
@classmethod
|
|
@property
|
|
@abstractmethod
|
|
def description(cls) -> str:
|
|
pass
|
|
|
|
@classmethod
|
|
@property
|
|
@abstractmethod
|
|
def data(cls) -> dict:
|
|
pass
|
|
|
|
@property
|
|
def done(self) -> bool:
|
|
"""
|
|
判断动作是否完成
|
|
"""
|
|
return self._done_flag
|
|
|
|
@property
|
|
@abstractmethod
|
|
def success(self) -> bool:
|
|
"""
|
|
判断动作是否成功
|
|
"""
|
|
pass
|
|
|
|
def job_done(self):
|
|
"""
|
|
标记动作完成
|
|
"""
|
|
self._done_flag = True
|
|
|
|
@abstractmethod
|
|
def execute(self, workflow_id: int, params: ActionParams, context: ActionContext) -> ActionContext:
|
|
"""
|
|
执行动作
|
|
"""
|
|
raise NotImplementedError
|