fix 完善事件和消息发送

This commit is contained in:
jxxghp
2025-03-01 18:34:39 +08:00
parent ed8895dfbb
commit 8bd6ccb0de
16 changed files with 95 additions and 57 deletions

View File

@@ -15,6 +15,8 @@ class BaseAction(ABC):
# 完成标志
_done_flag = False
# 执行信息
_message = ""
@classmethod
@property
@@ -49,10 +51,18 @@ class BaseAction(ABC):
"""
pass
def job_done(self):
@property
def message(self) -> str:
"""
执行信息
"""
return self._message
def job_done(self, message: str = None):
"""
标记动作完成
"""
self._message = message
self._done_flag = True
@abstractmethod

View File

@@ -104,5 +104,5 @@ class AddDownloadAction(BaseAction):
[DownloadTask(download_id=did, downloader=params.downloader) for did in self._added_downloads]
)
self.job_done()
self.job_done(f"已添加 {len(self._added_downloads)} 个下载任务")
return context

View File

@@ -77,5 +77,5 @@ class AddSubscribeAction(BaseAction):
for sid in self._added_subscribes:
context.subscribes.append(self.subscribeoper.get(sid))
self.job_done()
self.job_done(f"已添加 {len(self._added_subscribes)} 个订阅")
return context

View File

@@ -166,5 +166,5 @@ class FetchMediasAction(BaseAction):
if self._medias:
context.medias.extend(self._medias)
self.job_done()
self.job_done(f"获取到 {len(self._medias)} 条媒数据")
return context

View File

@@ -105,8 +105,8 @@ class FetchRssAction(BaseAction):
self._rss_torrents.append(Context(meta_info=meta, media_info=mediainfo, torrent_info=torrentinfo))
if self._rss_torrents:
logger.info(f"获取 {len(self._rss_torrents)} 个RSS资源")
logger.info(f"获取 {len(self._rss_torrents)} 个RSS资源")
context.torrents.extend(self._rss_torrents)
self.job_done()
self.job_done(f"获取到 {len(self._rss_torrents)} 个资源")
return context

View File

@@ -97,5 +97,5 @@ class FetchTorrentsAction(BaseAction):
context.torrents.extend(self._torrents)
logger.info(f"共搜索到 {len(self._torrents)} 条资源")
self.job_done()
self.job_done(f"搜索到 {len(self._torrents)} 个资源")
return context

View File

@@ -64,5 +64,5 @@ class FilterMediasAction(BaseAction):
if self._medias:
context.medias = self._medias
self.job_done()
self.job_done(f"过滤后剩余 {len(self._medias)} 条媒体数据")
return context

View File

@@ -80,5 +80,5 @@ class FilterTorrentsAction(BaseAction):
context.torrents = self._torrents
self.job_done()
self.job_done(f"过滤后剩余 {len(self._torrents)} 个资源")
return context

View File

@@ -1,14 +1,13 @@
import copy
from pathlib import Path
from typing import Optional
from pydantic import Field
from app.actions import BaseAction
from app.core.config import global_vars, settings
from app.schemas import ActionParams, ActionContext
from app.chain.storage import StorageChain
from app.core.config import global_vars, settings
from app.log import logger
from app.schemas import ActionParams, ActionContext
class ScanFileParams(ActionParams):
@@ -74,5 +73,5 @@ class ScanFileAction(BaseAction):
if self._fileitems:
context.fileitems.extend(self._fileitems)
self.job_done()
self.job_done(f"扫描到 {len(self._fileitems)} 个文件")
return context

View File

@@ -68,5 +68,5 @@ class ScrapeFileAction(BaseAction):
self.mediachain.scrape_metadata(fileitem=fileitem, meta=meta, mediainfo=mediainfo)
self._scraped_files.append(fileitem)
self.job_done()
self.job_done(f"成功刮削了 {len(self._scraped_files)} 个文件")
return context

View File

@@ -1,9 +1,7 @@
import copy
from app.actions import BaseAction
from app.core.config import global_vars
from app.schemas import ActionParams, ActionContext
from app.core.event import eventmanager
from app.schemas import ActionParams, ActionContext
from app.schemas.types import ChainEventType
class SendEventParams(ActionParams):
@@ -26,7 +24,7 @@ class SendEventAction(BaseAction):
@classmethod
@property
def description(cls) -> str:
return "发送队列中的所有事件"
return "发送任务执行事件"
@classmethod
@property
@@ -39,16 +37,12 @@ class SendEventAction(BaseAction):
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
发送events中的事件
发送工作流事件,以更插件干预工作流执行
"""
if context.events:
# 按优先级排序,优先级高的先发送
context.events.sort(key=lambda x: x.priority, reverse=True)
for event in copy.deepcopy(context.events):
if global_vars.is_workflow_stopped(workflow_id):
break
eventmanager.send_event(etype=event.event_type, data=event.event_data)
context.events.remove(event)
# 触发资源下载事件,更新执行上下文
event = eventmanager.send_event(ChainEventType.WorkflowExecution, context)
if event and event.event_data:
context = event.event_data
self.job_done()
return context

View File

@@ -1,11 +1,9 @@
import copy
from typing import List, Optional, Union
from pydantic import Field
from app.actions import BaseAction, ActionChain
from app.core.config import global_vars
from app.schemas import ActionParams, ActionContext
from app.schemas import ActionParams, ActionContext, Notification
class SendMessageParams(ActionParams):
@@ -33,7 +31,7 @@ class SendMessageAction(BaseAction):
@classmethod
@property
def description(cls) -> str:
return "发送队列中的所有消息"
return "发送任务执行消息"
@classmethod
@property
@@ -48,17 +46,28 @@ class SendMessageAction(BaseAction):
"""
发送messages中的消息
"""
for message in copy.deepcopy(context.messages):
if global_vars.is_workflow_stopped(workflow_id):
break
if params.client:
message.source = params.client
if params.userid:
message.userid = params.userid
self.chain.post_message(message)
context.messages.remove(message)
context.messages = []
params = SendMessageParams(**params)
msg_text = f"当前进度:{context.progress}%"
index = 1
if context.execute_history:
for history in context.execute_history:
if not history.message:
continue
msg_text += f"\n{index}. {history.action}{history.message}"
index += 1
# 发送消息
if not params.client:
params.client = [None]
for client in params.client:
self.chain.post_message(
Notification(
source=client,
userid=params.userid,
title="【工作流执行结果】",
text=msg_text,
link="#/workflow"
)
)
self.job_done()
return context

View File

@@ -14,7 +14,7 @@ from app.core.workflow import WorkFlowManager
from app.db.models import Workflow
from app.db.workflow_oper import WorkflowOper
from app.log import logger
from app.schemas import ActionContext, ActionFlow, Action
from app.schemas import ActionContext, ActionFlow, Action, ActionExecution
class WorkflowExecutor:
@@ -33,6 +33,8 @@ class WorkflowExecutor:
self.step_callback = step_callback
self.actions = {action['id']: Action(**action) for action in workflow.actions}
self.flows = [ActionFlow(**flow) for flow in workflow.flows]
self.total_actions = len(self.actions)
self.finished_actions = 0
self.success = True
self.errmsg = ""
@@ -115,21 +117,35 @@ class WorkflowExecutor:
)
future.add_done_callback(self.on_node_complete)
def execute_node(self, workflow_id: int, node_id: int, context: ActionContext) -> Tuple[Action, bool, ActionContext]:
def execute_node(self, workflow_id: int, node_id: int,
context: ActionContext) -> Tuple[Action, bool, str, ActionContext]:
"""
执行单个节点操作返回修改后的上下文和节点ID
"""
action = self.actions[node_id]
state, result_ctx = self.workflowmanager.excute(workflow_id, action, context=context)
return action, state, result_ctx
state, message, result_ctx = self.workflowmanager.excute(workflow_id, action, context=context)
return action, state, message, result_ctx
def on_node_complete(self, future):
"""
节点完成回调:更新上下文、处理后继节点
"""
action, state, result_ctx = future.result()
action, state, message, result_ctx = future.result()
try:
self.finished_actions += 1
# 更新当前进度
self.context.progress = round(self.finished_actions / self.total_actions) * 100
# 补充执行历史
self.context.execute_history.append(
ActionExecution(
action=action.name,
result=state,
message=message
)
)
# 节点执行失败
if not state:
self.success = False

View File

@@ -55,7 +55,8 @@ class WorkFlowManager(metaclass=Singleton):
"""
pass
def excute(self, workflow_id: int, action: Action, context: ActionContext = None) -> Tuple[bool, ActionContext]:
def excute(self, workflow_id: int, action: Action,
context: ActionContext = None) -> Tuple[bool, str, ActionContext]:
"""
执行工作流动作
"""
@@ -70,7 +71,7 @@ class WorkFlowManager(metaclass=Singleton):
result_context = action_obj.execute(workflow_id, action.data, context)
except Exception as err:
logger.error(f"{action.name} 执行失败: {err}")
return False, context
return False, f"{err}", context
loop = action.data.get("loop")
loop_interval = action.data.get("loop_interval")
if loop and loop_interval:
@@ -87,10 +88,10 @@ class WorkFlowManager(metaclass=Singleton):
logger.info(f"{action.name} 执行成功")
else:
logger.error(f"{action.name} 执行失败!")
return action_obj.success, result_context
return action_obj.success, action_obj.message, result_context
else:
logger.error(f"未找到动作: {action.type} - {action.name}")
return False, context
return False, " ", context
def list_actions(self) -> List[dict]:
"""

View File

@@ -87,6 +87,8 @@ class ChainEventType(Enum):
MediaRecognizeConvert = "media.recognize.convert"
# 推荐数据源
RecommendSource = "recommend.source"
# 工作流执行
WorkflowExecution = "workflow.execution"
# 系统配置Key字典

View File

@@ -3,12 +3,10 @@ from typing import Optional, List
from pydantic import BaseModel, Field
from app.schemas.context import Context, MediaInfo
from app.schemas.file import FileItem
from app.schemas.download import DownloadTask
from app.schemas.file import FileItem
from app.schemas.site import Site
from app.schemas.subscribe import Subscribe
from app.schemas.message import Notification
from app.schemas.event import Event
class Workflow(BaseModel):
@@ -52,6 +50,15 @@ class Action(BaseModel):
data: Optional[dict] = Field({}, description="参数")
class ActionExecution(BaseModel):
"""
动作执行情况
"""
action: Optional[str] = Field(None, description="当前动作(名称)")
result: Optional[bool] = Field(None, description="执行结果")
message: Optional[str] = Field(None, description="执行消息")
class ActionContext(BaseModel):
"""
动作基础上下文,各动作通用数据
@@ -63,8 +70,8 @@ class ActionContext(BaseModel):
downloads: Optional[List[DownloadTask]] = Field([], description="下载任务列表")
sites: Optional[List[Site]] = Field([], description="站点列表")
subscribes: Optional[List[Subscribe]] = Field([], description="订阅列表")
messages: Optional[List[Notification]] = Field([], description="消息列表")
events: Optional[List[Event]] = Field([], description="事件列表")
execute_history: Optional[List[ActionExecution]] = Field([], description="执行历史")
progress: Optional[int] = Field(0, description="执行进度(%")
class ActionFlow(BaseModel):