diff --git a/app/actions/__init__.py b/app/actions/__init__.py index 95116fa2..6b1c41fd 100644 --- a/app/actions/__init__.py +++ b/app/actions/__init__.py @@ -15,6 +15,8 @@ class BaseAction(ABC): # 完成标志 _done_flag = False + # 执行信息 + _message = "" @classmethod @property @@ -49,10 +51,18 @@ class BaseAction(ABC): """ pass - def job_done(self): + @property + def message(self) -> str: + """ + 执行信息 + """ + return self._message + + def job_done(self, message: str = None): """ 标记动作完成 """ + self._message = message self._done_flag = True @abstractmethod diff --git a/app/actions/add_download.py b/app/actions/add_download.py index 072cef02..b5486d28 100644 --- a/app/actions/add_download.py +++ b/app/actions/add_download.py @@ -104,5 +104,5 @@ class AddDownloadAction(BaseAction): [DownloadTask(download_id=did, downloader=params.downloader) for did in self._added_downloads] ) - self.job_done() + self.job_done(f"已添加 {len(self._added_downloads)} 个下载任务") return context diff --git a/app/actions/add_subscribe.py b/app/actions/add_subscribe.py index c542c2e9..779b8d23 100644 --- a/app/actions/add_subscribe.py +++ b/app/actions/add_subscribe.py @@ -77,5 +77,5 @@ class AddSubscribeAction(BaseAction): for sid in self._added_subscribes: context.subscribes.append(self.subscribeoper.get(sid)) - self.job_done() + self.job_done(f"已添加 {len(self._added_subscribes)} 个订阅") return context diff --git a/app/actions/fetch_medias.py b/app/actions/fetch_medias.py index f60307d4..03910837 100644 --- a/app/actions/fetch_medias.py +++ b/app/actions/fetch_medias.py @@ -166,5 +166,5 @@ class FetchMediasAction(BaseAction): if self._medias: context.medias.extend(self._medias) - self.job_done() + self.job_done(f"获取到 {len(self._medias)} 条媒数据") return context diff --git a/app/actions/fetch_rss.py b/app/actions/fetch_rss.py index d11cad79..1c3fd5d3 100644 --- a/app/actions/fetch_rss.py +++ b/app/actions/fetch_rss.py @@ -105,8 +105,8 @@ class FetchRssAction(BaseAction): self._rss_torrents.append(Context(meta_info=meta, media_info=mediainfo, torrent_info=torrentinfo)) if self._rss_torrents: - logger.info(f"已获取 {len(self._rss_torrents)} 个RSS资源") + logger.info(f"获取到 {len(self._rss_torrents)} 个RSS资源") context.torrents.extend(self._rss_torrents) - self.job_done() + self.job_done(f"获取到 {len(self._rss_torrents)} 个资源") return context diff --git a/app/actions/fetch_torrents.py b/app/actions/fetch_torrents.py index 2666ecb2..eee5a191 100644 --- a/app/actions/fetch_torrents.py +++ b/app/actions/fetch_torrents.py @@ -97,5 +97,5 @@ class FetchTorrentsAction(BaseAction): context.torrents.extend(self._torrents) logger.info(f"共搜索到 {len(self._torrents)} 条资源") - self.job_done() + self.job_done(f"搜索到 {len(self._torrents)} 个资源") return context diff --git a/app/actions/filter_medias.py b/app/actions/filter_medias.py index 24c74ce0..a1e3b5fe 100644 --- a/app/actions/filter_medias.py +++ b/app/actions/filter_medias.py @@ -64,5 +64,5 @@ class FilterMediasAction(BaseAction): if self._medias: context.medias = self._medias - self.job_done() + self.job_done(f"过滤后剩余 {len(self._medias)} 条媒体数据") return context diff --git a/app/actions/filter_torrents.py b/app/actions/filter_torrents.py index c8f1c4f3..912df8f0 100644 --- a/app/actions/filter_torrents.py +++ b/app/actions/filter_torrents.py @@ -80,5 +80,5 @@ class FilterTorrentsAction(BaseAction): context.torrents = self._torrents - self.job_done() + self.job_done(f"过滤后剩余 {len(self._torrents)} 个资源") return context diff --git a/app/actions/scan_file.py b/app/actions/scan_file.py index 82b7f256..224b2ce6 100644 --- a/app/actions/scan_file.py +++ b/app/actions/scan_file.py @@ -1,14 +1,13 @@ -import copy from pathlib import Path from typing import Optional from pydantic import Field from app.actions import BaseAction -from app.core.config import global_vars, settings -from app.schemas import ActionParams, ActionContext from app.chain.storage import StorageChain +from app.core.config import global_vars, settings from app.log import logger +from app.schemas import ActionParams, ActionContext class ScanFileParams(ActionParams): @@ -74,5 +73,5 @@ class ScanFileAction(BaseAction): if self._fileitems: context.fileitems.extend(self._fileitems) - self.job_done() + self.job_done(f"扫描到 {len(self._fileitems)} 个文件") return context diff --git a/app/actions/scrape_file.py b/app/actions/scrape_file.py index 3c55b4b2..a0123ff8 100644 --- a/app/actions/scrape_file.py +++ b/app/actions/scrape_file.py @@ -68,5 +68,5 @@ class ScrapeFileAction(BaseAction): self.mediachain.scrape_metadata(fileitem=fileitem, meta=meta, mediainfo=mediainfo) self._scraped_files.append(fileitem) - self.job_done() + self.job_done(f"成功刮削了 {len(self._scraped_files)} 个文件") return context diff --git a/app/actions/send_event.py b/app/actions/send_event.py index 07c6d6d0..0b8ceecd 100644 --- a/app/actions/send_event.py +++ b/app/actions/send_event.py @@ -1,9 +1,7 @@ -import copy - from app.actions import BaseAction -from app.core.config import global_vars -from app.schemas import ActionParams, ActionContext from app.core.event import eventmanager +from app.schemas import ActionParams, ActionContext +from app.schemas.types import ChainEventType class SendEventParams(ActionParams): @@ -26,7 +24,7 @@ class SendEventAction(BaseAction): @classmethod @property def description(cls) -> str: - return "发送队列中的所有事件" + return "发送任务执行事件" @classmethod @property @@ -39,16 +37,12 @@ class SendEventAction(BaseAction): def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext: """ - 发送events中的事件 + 发送工作流事件,以更插件干预工作流执行 """ - if context.events: - # 按优先级排序,优先级高的先发送 - context.events.sort(key=lambda x: x.priority, reverse=True) - for event in copy.deepcopy(context.events): - if global_vars.is_workflow_stopped(workflow_id): - break - eventmanager.send_event(etype=event.event_type, data=event.event_data) - context.events.remove(event) + # 触发资源下载事件,更新执行上下文 + event = eventmanager.send_event(ChainEventType.WorkflowExecution, context) + if event and event.event_data: + context = event.event_data self.job_done() return context diff --git a/app/actions/send_message.py b/app/actions/send_message.py index d6a6b1de..16a960fb 100644 --- a/app/actions/send_message.py +++ b/app/actions/send_message.py @@ -1,11 +1,9 @@ -import copy from typing import List, Optional, Union from pydantic import Field from app.actions import BaseAction, ActionChain -from app.core.config import global_vars -from app.schemas import ActionParams, ActionContext +from app.schemas import ActionParams, ActionContext, Notification class SendMessageParams(ActionParams): @@ -33,7 +31,7 @@ class SendMessageAction(BaseAction): @classmethod @property def description(cls) -> str: - return "发送队列中的所有消息" + return "发送任务执行消息" @classmethod @property @@ -48,17 +46,28 @@ class SendMessageAction(BaseAction): """ 发送messages中的消息 """ - for message in copy.deepcopy(context.messages): - if global_vars.is_workflow_stopped(workflow_id): - break - if params.client: - message.source = params.client - if params.userid: - message.userid = params.userid - self.chain.post_message(message) - context.messages.remove(message) - - context.messages = [] + params = SendMessageParams(**params) + msg_text = f"当前进度:{context.progress}%" + index = 1 + if context.execute_history: + for history in context.execute_history: + if not history.message: + continue + msg_text += f"\n{index}. {history.action}:{history.message}" + index += 1 + # 发送消息 + if not params.client: + params.client = [None] + for client in params.client: + self.chain.post_message( + Notification( + source=client, + userid=params.userid, + title="【工作流执行结果】", + text=msg_text, + link="#/workflow" + ) + ) self.job_done() return context diff --git a/app/chain/workflow.py b/app/chain/workflow.py index c5961cf5..d237eae9 100644 --- a/app/chain/workflow.py +++ b/app/chain/workflow.py @@ -14,7 +14,7 @@ from app.core.workflow import WorkFlowManager from app.db.models import Workflow from app.db.workflow_oper import WorkflowOper from app.log import logger -from app.schemas import ActionContext, ActionFlow, Action +from app.schemas import ActionContext, ActionFlow, Action, ActionExecution class WorkflowExecutor: @@ -33,6 +33,8 @@ class WorkflowExecutor: self.step_callback = step_callback self.actions = {action['id']: Action(**action) for action in workflow.actions} self.flows = [ActionFlow(**flow) for flow in workflow.flows] + self.total_actions = len(self.actions) + self.finished_actions = 0 self.success = True self.errmsg = "" @@ -115,21 +117,35 @@ class WorkflowExecutor: ) future.add_done_callback(self.on_node_complete) - def execute_node(self, workflow_id: int, node_id: int, context: ActionContext) -> Tuple[Action, bool, ActionContext]: + def execute_node(self, workflow_id: int, node_id: int, + context: ActionContext) -> Tuple[Action, bool, str, ActionContext]: """ 执行单个节点操作,返回修改后的上下文和节点ID """ action = self.actions[node_id] - state, result_ctx = self.workflowmanager.excute(workflow_id, action, context=context) - return action, state, result_ctx + state, message, result_ctx = self.workflowmanager.excute(workflow_id, action, context=context) + return action, state, message, result_ctx def on_node_complete(self, future): """ 节点完成回调:更新上下文、处理后继节点 """ - action, state, result_ctx = future.result() + action, state, message, result_ctx = future.result() try: + self.finished_actions += 1 + # 更新当前进度 + self.context.progress = round(self.finished_actions / self.total_actions) * 100 + + # 补充执行历史 + self.context.execute_history.append( + ActionExecution( + action=action.name, + result=state, + message=message + ) + ) + # 节点执行失败 if not state: self.success = False diff --git a/app/core/workflow.py b/app/core/workflow.py index 01e640c3..18553ffe 100644 --- a/app/core/workflow.py +++ b/app/core/workflow.py @@ -55,7 +55,8 @@ class WorkFlowManager(metaclass=Singleton): """ pass - def excute(self, workflow_id: int, action: Action, context: ActionContext = None) -> Tuple[bool, ActionContext]: + def excute(self, workflow_id: int, action: Action, + context: ActionContext = None) -> Tuple[bool, str, ActionContext]: """ 执行工作流动作 """ @@ -70,7 +71,7 @@ class WorkFlowManager(metaclass=Singleton): result_context = action_obj.execute(workflow_id, action.data, context) except Exception as err: logger.error(f"{action.name} 执行失败: {err}") - return False, context + return False, f"{err}", context loop = action.data.get("loop") loop_interval = action.data.get("loop_interval") if loop and loop_interval: @@ -87,10 +88,10 @@ class WorkFlowManager(metaclass=Singleton): logger.info(f"{action.name} 执行成功") else: logger.error(f"{action.name} 执行失败!") - return action_obj.success, result_context + return action_obj.success, action_obj.message, result_context else: logger.error(f"未找到动作: {action.type} - {action.name}") - return False, context + return False, " ", context def list_actions(self) -> List[dict]: """ diff --git a/app/schemas/types.py b/app/schemas/types.py index b24fd786..f177f8e7 100644 --- a/app/schemas/types.py +++ b/app/schemas/types.py @@ -87,6 +87,8 @@ class ChainEventType(Enum): MediaRecognizeConvert = "media.recognize.convert" # 推荐数据源 RecommendSource = "recommend.source" + # 工作流执行 + WorkflowExecution = "workflow.execution" # 系统配置Key字典 diff --git a/app/schemas/workflow.py b/app/schemas/workflow.py index 4985c5a5..4934c1cb 100644 --- a/app/schemas/workflow.py +++ b/app/schemas/workflow.py @@ -3,12 +3,10 @@ from typing import Optional, List from pydantic import BaseModel, Field from app.schemas.context import Context, MediaInfo -from app.schemas.file import FileItem from app.schemas.download import DownloadTask +from app.schemas.file import FileItem from app.schemas.site import Site from app.schemas.subscribe import Subscribe -from app.schemas.message import Notification -from app.schemas.event import Event class Workflow(BaseModel): @@ -52,6 +50,15 @@ class Action(BaseModel): data: Optional[dict] = Field({}, description="参数") +class ActionExecution(BaseModel): + """ + 动作执行情况 + """ + action: Optional[str] = Field(None, description="当前动作(名称)") + result: Optional[bool] = Field(None, description="执行结果") + message: Optional[str] = Field(None, description="执行消息") + + class ActionContext(BaseModel): """ 动作基础上下文,各动作通用数据 @@ -63,8 +70,8 @@ class ActionContext(BaseModel): downloads: Optional[List[DownloadTask]] = Field([], description="下载任务列表") sites: Optional[List[Site]] = Field([], description="站点列表") subscribes: Optional[List[Subscribe]] = Field([], description="订阅列表") - messages: Optional[List[Notification]] = Field([], description="消息列表") - events: Optional[List[Event]] = Field([], description="事件列表") + execute_history: Optional[List[ActionExecution]] = Field([], description="执行历史") + progress: Optional[int] = Field(0, description="执行进度(%)") class ActionFlow(BaseModel):