mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-04-13 17:52:28 +08:00
add workflow executor
This commit is contained in:
@@ -1,14 +1,156 @@
|
||||
from datetime import datetime
|
||||
import threading
|
||||
from collections import defaultdict, deque
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import List, Tuple
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
from app.chain import ChainBase
|
||||
from app.core.config import global_vars
|
||||
from app.core.workflow import WorkFlowManager
|
||||
from app.db.models import Workflow
|
||||
from app.db.workflow_oper import WorkflowOper
|
||||
from app.log import logger
|
||||
from app.schemas import Workflow, ActionContext, Action, ActionFlow
|
||||
from app.schemas import ActionContext, ActionFlow, Action
|
||||
|
||||
|
||||
class WorkflowExecutor:
|
||||
"""
|
||||
工作流执行器
|
||||
"""
|
||||
|
||||
def __init__(self, workflow: Workflow):
|
||||
"""
|
||||
初始化工作流执行器
|
||||
"""
|
||||
self.workflowoper = WorkflowOper()
|
||||
self.workflowmanager = WorkFlowManager()
|
||||
|
||||
self.workflow = workflow
|
||||
self.actions = {action['id']: Action(**action) for action in workflow.actions}
|
||||
self.flows = [ActionFlow(**flow) for flow in workflow.flows]
|
||||
|
||||
self.success = True
|
||||
self.errmsg = ""
|
||||
|
||||
# 构建邻接表、入度表
|
||||
self.adjacency = defaultdict(list)
|
||||
self.indegree = defaultdict(int)
|
||||
for flow in self.flows:
|
||||
source = flow.source
|
||||
target = flow.target
|
||||
self.adjacency[source].append(target)
|
||||
self.indegree[target] += 1
|
||||
|
||||
# 初始化所有节点的入度(确保未被引用的节点入度为0)
|
||||
for action_id in self.actions:
|
||||
if action_id not in self.indegree:
|
||||
self.indegree[action_id] = 0
|
||||
|
||||
# 初始上下文
|
||||
if workflow.current_action:
|
||||
self.context = ActionContext(**workflow.context)
|
||||
else:
|
||||
self.context = ActionContext()
|
||||
|
||||
# 线程安全队列
|
||||
self.queue = deque()
|
||||
# 锁用于保证线程安全
|
||||
self.lock = threading.Lock()
|
||||
# 线程池
|
||||
self.executor = ThreadPoolExecutor()
|
||||
# 跟踪运行中的任务数
|
||||
self.running_tasks = 0
|
||||
|
||||
# 初始化队列:入度为0的节点
|
||||
for action_id in self.actions:
|
||||
if self.indegree[action_id] == 0:
|
||||
self.queue.append(action_id)
|
||||
|
||||
def execute(self):
|
||||
"""
|
||||
执行工作流
|
||||
"""
|
||||
while True:
|
||||
with self.lock:
|
||||
# 退出条件:队列为空且无运行任务
|
||||
if not self.queue and self.running_tasks == 0:
|
||||
break
|
||||
if not self.queue:
|
||||
continue
|
||||
node_id = self.queue.popleft()
|
||||
# 标记任务开始
|
||||
self.running_tasks += 1
|
||||
|
||||
# 已停机
|
||||
if not global_vars.is_system_stopped:
|
||||
break
|
||||
|
||||
# 已执行的跳过
|
||||
if (self.workflow.current_action
|
||||
and str(node_id) in self.workflow.current_action.split(',')):
|
||||
continue
|
||||
|
||||
# 提交任务到线程池
|
||||
future = self.executor.submit(
|
||||
self.execute_node,
|
||||
node_id,
|
||||
self.context
|
||||
)
|
||||
future.add_done_callback(self.on_node_complete)
|
||||
|
||||
def execute_node(self, node_id: int, context: ActionContext) -> Tuple[Action, bool, ActionContext]:
|
||||
"""
|
||||
执行单个节点操作,返回修改后的上下文和节点ID
|
||||
"""
|
||||
action = self.actions[node_id]
|
||||
state, result_ctx = self.workflowmanager.excute(action, context)
|
||||
return action, state, result_ctx
|
||||
|
||||
def on_node_complete(self, future):
|
||||
"""
|
||||
节点完成回调:更新上下文、处理后继节点
|
||||
"""
|
||||
action, state, result_ctx = future.result()
|
||||
|
||||
# 节点执行失败
|
||||
if not state:
|
||||
self.success = False
|
||||
self.errmsg = f"{action.name} 执行失败"
|
||||
return
|
||||
|
||||
# 更新主上下文
|
||||
with self.lock:
|
||||
self.merge_context(result_ctx)
|
||||
self.save_step(action)
|
||||
|
||||
# 处理后继节点
|
||||
successors = self.adjacency.get(action.id, [])
|
||||
for succ_id in successors:
|
||||
with self.lock:
|
||||
self.indegree[succ_id] -= 1
|
||||
print(f"节点 {succ_id} 入度减至 {self.indegree[succ_id]}")
|
||||
if self.indegree[succ_id] == 0:
|
||||
self.queue.append(succ_id)
|
||||
|
||||
# 标记任务完成
|
||||
with self.lock:
|
||||
self.running_tasks -= 1
|
||||
|
||||
def merge_context(self, context: ActionContext):
|
||||
"""
|
||||
合并上下文
|
||||
"""
|
||||
# 遍历上下文,补充缺失的字段
|
||||
self_context_dict = self.context.dict()
|
||||
for key, value in context.dict().items():
|
||||
if key not in self_context_dict:
|
||||
self_context_dict[key] = value
|
||||
self.context = ActionContext(**self_context_dict)
|
||||
|
||||
def save_step(self, node_id: int):
|
||||
"""
|
||||
保存上下文到数据库
|
||||
"""
|
||||
self.workflowoper.step(self.workflow.id, action_id=node_id, context=self.context.dict())
|
||||
|
||||
|
||||
class WorkflowChain(ChainBase):
|
||||
@@ -17,9 +159,8 @@ class WorkflowChain(ChainBase):
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.workflowoper = WorkflowOper()
|
||||
self.workflowmanager = WorkFlowManager()
|
||||
super().__init__()
|
||||
|
||||
def process(self, workflow_id: int, from_begin: bool = True) -> Tuple[bool, str]:
|
||||
"""
|
||||
@@ -27,31 +168,9 @@ class WorkflowChain(ChainBase):
|
||||
:param workflow_id: 工作流ID
|
||||
:param from_begin: 是否从头开始,默认为True
|
||||
"""
|
||||
|
||||
_init_action = None
|
||||
|
||||
def __get_next_action(_workflow: Workflow, _action: str) -> List[Action]:
|
||||
"""
|
||||
获取下一个动作
|
||||
"""
|
||||
if not _action:
|
||||
# 获取起点动作
|
||||
actions = []
|
||||
source = [f.source for f in _workflow.flows]
|
||||
target = [f.target for f in _workflow.flows]
|
||||
for act in _workflow.actions:
|
||||
if act.id not in target and act.id in source:
|
||||
actions.append(Action(**act))
|
||||
return actions
|
||||
else:
|
||||
if _action == _init_action:
|
||||
# 返回当前动作
|
||||
action_ids = _action.split(',')
|
||||
return [Action(**act) for act in _workflow.actions if act.id in action_ids]
|
||||
else:
|
||||
# 获取下一个动作
|
||||
flows = [ActionFlow(**f) for f in _workflow.flows if f.source == _action]
|
||||
return [Action(**act) for act in _workflow.actions if act.id in [f.target for f in flows]]
|
||||
# 重置工作流
|
||||
if from_begin:
|
||||
self.workflowoper.reset(workflow_id)
|
||||
|
||||
# 查询工作流数据
|
||||
workflow = self.workflowoper.get(workflow_id)
|
||||
@@ -68,77 +187,18 @@ class WorkflowChain(ChainBase):
|
||||
logger.info(f"开始处理 {workflow.name},共 {len(workflow.actions)} 个动作 ...")
|
||||
self.workflowoper.start(workflow_id)
|
||||
|
||||
# 启用上下文
|
||||
if not from_begin and workflow.current_action:
|
||||
_init_action = workflow.current_action
|
||||
context = ActionContext(**workflow.context)
|
||||
# 执行工作流
|
||||
executor = WorkflowExecutor(workflow)
|
||||
executor.execute()
|
||||
|
||||
if executor.success:
|
||||
logger.info(f"工作流 {workflow.name} 执行失败:{executor.errmsg}")
|
||||
self.workflowoper.fail(workflow_id, result=executor.errmsg)
|
||||
return False, executor.errmsg
|
||||
else:
|
||||
context = ActionContext()
|
||||
|
||||
if from_begin:
|
||||
current_action = None
|
||||
else:
|
||||
current_action = _init_action
|
||||
|
||||
# 循环执行
|
||||
while next_actions := __get_next_action(workflow, current_action):
|
||||
if global_vars.is_system_stopped:
|
||||
break
|
||||
if not next_actions:
|
||||
break
|
||||
# 获取下一个动作
|
||||
if len(next_actions) > 1:
|
||||
# 多个下一步动作
|
||||
current_action = ",".join([act.id for act in next_actions])
|
||||
# 动作名称
|
||||
current_acttion_names = "、".join([act.name for act in next_actions])
|
||||
# 开始计时
|
||||
start_time = datetime.now()
|
||||
# 多个下一步动作,多线程并发执行,等待结果
|
||||
executor = ThreadPoolExecutor(max_workers=len(next_actions))
|
||||
all_task = []
|
||||
for action in next_actions:
|
||||
task = executor.submit(self.workflowmanager.excute, action, context)
|
||||
all_task.append(task)
|
||||
# 等待结果
|
||||
success_count = 0
|
||||
for future in as_completed(all_task):
|
||||
state, context = future.result()
|
||||
if state:
|
||||
success_count += 1
|
||||
# 计算耗时
|
||||
end_time = datetime.now()
|
||||
# 记录步骤
|
||||
self.workflowoper.step(workflow_id,
|
||||
action=current_action,
|
||||
context=context.dict())
|
||||
if success_count < len(next_actions):
|
||||
logger.error(f"动作 {current_acttion_names} 未全部成功,工作流失败")
|
||||
self.workflowoper.fail(workflow_id, result=f"动作 {current_acttion_names} 未全部成功")
|
||||
return False, f"动作 {current_acttion_names} 未全部成功"
|
||||
else:
|
||||
logger.info(f"动作 {current_acttion_names} 执行完成,耗时:{(end_time - start_time).seconds} 秒")
|
||||
else:
|
||||
# 单个下一步动作
|
||||
action = next_actions[0]
|
||||
current_action = action.id
|
||||
# 开始计时
|
||||
start_time = datetime.now()
|
||||
# 执行动作
|
||||
state, context = self.workflowmanager.excute(action, context)
|
||||
# 计算耗时
|
||||
end_time = datetime.now()
|
||||
# 记录步骤
|
||||
self.workflowoper.step(workflow_id, action=current_action, context=context.dict())
|
||||
if not state:
|
||||
logger.error(f"动作 {action.name} 执行失败,工作流失败")
|
||||
self.workflowoper.fail(workflow_id, result=f"动作 {action.name} 执行失败")
|
||||
return False, f"动作 {action.name} 执行失败"
|
||||
logger.info(f"动作 {action.name} 执行完成,耗时:{(end_time - start_time).seconds} 秒")
|
||||
|
||||
logger.info(f"工作流 {workflow.name} 执行完成")
|
||||
self.workflowoper.success(workflow_id)
|
||||
return True, ""
|
||||
logger.info(f"工作流 {workflow.name} 执行成功")
|
||||
self.workflowoper.success(workflow_id)
|
||||
return True, ""
|
||||
|
||||
def get_workflows(self) -> List[Workflow]:
|
||||
"""
|
||||
|
||||
@@ -57,24 +57,30 @@ class WorkFlowManager(metaclass=Singleton):
|
||||
"""
|
||||
if not context:
|
||||
context = ActionContext()
|
||||
if action.id in self._actions:
|
||||
if action.type in self._actions:
|
||||
# 实例化
|
||||
action_obj = self._actions[action.id]()
|
||||
action_obj = self._actions[action.type]()
|
||||
# 执行
|
||||
logger.info(f"执行动作: {action.id} - {action.name}")
|
||||
result_context = action_obj.execute(action.params, context)
|
||||
logger.info(f"{action.name} 执行结果: {action_obj.success}")
|
||||
if action_obj.success:
|
||||
logger.info(f"{action.name} 执行成功")
|
||||
else:
|
||||
logger.error(f"{action.name} 执行失败")
|
||||
if action.loop and action.loop_interval:
|
||||
while not action_obj.done:
|
||||
# 等待
|
||||
logger.info(f"{action.name} 等待 {action.loop_interval} 秒后继续执行")
|
||||
logger.info(f"{action.name} 等待 {action.loop_interval} 秒后继续执行 ...")
|
||||
sleep(action.loop_interval)
|
||||
# 执行
|
||||
logger.info(f"继续执行动作: {action.id} - {action.name}")
|
||||
result_context = action_obj.execute(action.params, result_context)
|
||||
logger.info(f"{action.name} 执行结果: {action_obj.success}")
|
||||
if action_obj.success:
|
||||
logger.info(f"{action.name} 执行成功")
|
||||
else:
|
||||
logger.error(f"{action.name} 执行失败")
|
||||
logger.info(f"{action.name} 执行完成")
|
||||
return action_obj.success, result_context
|
||||
else:
|
||||
logger.error(f"未找到动作: {action.id} - {action.name}")
|
||||
logger.error(f"未找到动作: {action.type} - {action.name}")
|
||||
return False, context
|
||||
|
||||
@@ -66,7 +66,6 @@ class Workflow(Base):
|
||||
db.query(Workflow).filter(Workflow.id == wid).update({
|
||||
"state": 'F',
|
||||
"result": result,
|
||||
"run_count": Workflow.run_count + 1,
|
||||
"last_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
})
|
||||
return True
|
||||
@@ -84,6 +83,19 @@ class Workflow(Base):
|
||||
|
||||
@staticmethod
|
||||
@db_update
|
||||
def update_current_action(db, wid: int, action: str, context: dict):
|
||||
db.query(Workflow).filter(Workflow.id == wid).update({"current_action": action, "context": context})
|
||||
def reset(db, wid: int):
|
||||
db.query(Workflow).filter(Workflow.id == wid).update({
|
||||
"state": 'W',
|
||||
"result": None,
|
||||
"current_action": None,
|
||||
})
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
@db_update
|
||||
def update_current_action(db, wid: int, action_id: int, context: dict):
|
||||
db.query(Workflow).filter(Workflow.id == wid).update({
|
||||
"current_action": f"{Workflow.current_action},{action_id}" if Workflow.current_action else action_id,
|
||||
"context": context
|
||||
})
|
||||
return True
|
||||
|
||||
@@ -55,8 +55,14 @@ class WorkflowOper(DbOper):
|
||||
"""
|
||||
return Workflow.fail(self._db, wid, result)
|
||||
|
||||
def step(self, wid: int, action: str, context: dict) -> bool:
|
||||
def step(self, wid: int, action_id: int, context: dict) -> bool:
|
||||
"""
|
||||
步进
|
||||
"""
|
||||
return Workflow.update_current_action(self._db, wid, action, context)
|
||||
return Workflow.update_current_action(self._db, wid, action_id, context)
|
||||
|
||||
def reset(self, wid: int) -> bool:
|
||||
"""
|
||||
重置
|
||||
"""
|
||||
return Workflow.reset(self._db, wid)
|
||||
|
||||
@@ -43,13 +43,13 @@ class Action(BaseModel):
|
||||
"""
|
||||
动作信息
|
||||
"""
|
||||
id: Optional[str] = Field(None, description="动作ID (类名)")
|
||||
id: Optional[int] = Field(None, description="动作ID")
|
||||
type: Optional[str] = Field(None, description="动作类型 (类名)")
|
||||
name: Optional[str] = Field(None, description="动作名称")
|
||||
description: Optional[str] = Field(None, description="动作描述")
|
||||
loop: Optional[bool] = Field(False, description="是否需要循环")
|
||||
loop_interval: Optional[int] = Field(0, description="循环间隔 (秒)")
|
||||
params: Optional[ActionParams] = Field({}, description="参数")
|
||||
type: Optional[str] = Field(None, description="类型")
|
||||
label: Optional[str] = Field(None, description="标签")
|
||||
position: Optional[dict] = Field({}, description="位置")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user