diff --git a/app/chain/workflow.py b/app/chain/workflow.py index 6bdc8c56..3ea0e4b7 100644 --- a/app/chain/workflow.py +++ b/app/chain/workflow.py @@ -1,14 +1,156 @@ -from datetime import datetime +import threading +from collections import defaultdict, deque +from concurrent.futures import ThreadPoolExecutor from typing import List, Tuple -from concurrent.futures import ThreadPoolExecutor, as_completed - from app.chain import ChainBase from app.core.config import global_vars from app.core.workflow import WorkFlowManager +from app.db.models import Workflow from app.db.workflow_oper import WorkflowOper from app.log import logger -from app.schemas import Workflow, ActionContext, Action, ActionFlow +from app.schemas import ActionContext, ActionFlow, Action + + +class WorkflowExecutor: + """ + 工作流执行器 + """ + + def __init__(self, workflow: Workflow): + """ + 初始化工作流执行器 + """ + self.workflowoper = WorkflowOper() + self.workflowmanager = WorkFlowManager() + + self.workflow = workflow + self.actions = {action['id']: Action(**action) for action in workflow.actions} + self.flows = [ActionFlow(**flow) for flow in workflow.flows] + + self.success = True + self.errmsg = "" + + # 构建邻接表、入度表 + self.adjacency = defaultdict(list) + self.indegree = defaultdict(int) + for flow in self.flows: + source = flow.source + target = flow.target + self.adjacency[source].append(target) + self.indegree[target] += 1 + + # 初始化所有节点的入度(确保未被引用的节点入度为0) + for action_id in self.actions: + if action_id not in self.indegree: + self.indegree[action_id] = 0 + + # 初始上下文 + if workflow.current_action: + self.context = ActionContext(**workflow.context) + else: + self.context = ActionContext() + + # 线程安全队列 + self.queue = deque() + # 锁用于保证线程安全 + self.lock = threading.Lock() + # 线程池 + self.executor = ThreadPoolExecutor() + # 跟踪运行中的任务数 + self.running_tasks = 0 + + # 初始化队列:入度为0的节点 + for action_id in self.actions: + if self.indegree[action_id] == 0: + self.queue.append(action_id) + + def execute(self): + """ + 执行工作流 + """ + while True: + with self.lock: + # 退出条件:队列为空且无运行任务 + if not self.queue and self.running_tasks == 0: + break + if not self.queue: + continue + node_id = self.queue.popleft() + # 标记任务开始 + self.running_tasks += 1 + + # 已停机 + if not global_vars.is_system_stopped: + break + + # 已执行的跳过 + if (self.workflow.current_action + and str(node_id) in self.workflow.current_action.split(',')): + continue + + # 提交任务到线程池 + future = self.executor.submit( + self.execute_node, + node_id, + self.context + ) + future.add_done_callback(self.on_node_complete) + + def execute_node(self, node_id: int, context: ActionContext) -> Tuple[Action, bool, ActionContext]: + """ + 执行单个节点操作,返回修改后的上下文和节点ID + """ + action = self.actions[node_id] + state, result_ctx = self.workflowmanager.excute(action, context) + return action, state, result_ctx + + def on_node_complete(self, future): + """ + 节点完成回调:更新上下文、处理后继节点 + """ + action, state, result_ctx = future.result() + + # 节点执行失败 + if not state: + self.success = False + self.errmsg = f"{action.name} 执行失败" + return + + # 更新主上下文 + with self.lock: + self.merge_context(result_ctx) + self.save_step(action) + + # 处理后继节点 + successors = self.adjacency.get(action.id, []) + for succ_id in successors: + with self.lock: + self.indegree[succ_id] -= 1 + print(f"节点 {succ_id} 入度减至 {self.indegree[succ_id]}") + if self.indegree[succ_id] == 0: + self.queue.append(succ_id) + + # 标记任务完成 + with self.lock: + self.running_tasks -= 1 + + def merge_context(self, context: ActionContext): + """ + 合并上下文 + """ + # 遍历上下文,补充缺失的字段 + self_context_dict = self.context.dict() + for key, value in context.dict().items(): + if key not in self_context_dict: + self_context_dict[key] = value + self.context = ActionContext(**self_context_dict) + + def save_step(self, node_id: int): + """ + 保存上下文到数据库 + """ + self.workflowoper.step(self.workflow.id, action_id=node_id, context=self.context.dict()) class WorkflowChain(ChainBase): @@ -17,9 +159,8 @@ class WorkflowChain(ChainBase): """ def __init__(self): - super().__init__() self.workflowoper = WorkflowOper() - self.workflowmanager = WorkFlowManager() + super().__init__() def process(self, workflow_id: int, from_begin: bool = True) -> Tuple[bool, str]: """ @@ -27,31 +168,9 @@ class WorkflowChain(ChainBase): :param workflow_id: 工作流ID :param from_begin: 是否从头开始,默认为True """ - - _init_action = None - - def __get_next_action(_workflow: Workflow, _action: str) -> List[Action]: - """ - 获取下一个动作 - """ - if not _action: - # 获取起点动作 - actions = [] - source = [f.source for f in _workflow.flows] - target = [f.target for f in _workflow.flows] - for act in _workflow.actions: - if act.id not in target and act.id in source: - actions.append(Action(**act)) - return actions - else: - if _action == _init_action: - # 返回当前动作 - action_ids = _action.split(',') - return [Action(**act) for act in _workflow.actions if act.id in action_ids] - else: - # 获取下一个动作 - flows = [ActionFlow(**f) for f in _workflow.flows if f.source == _action] - return [Action(**act) for act in _workflow.actions if act.id in [f.target for f in flows]] + # 重置工作流 + if from_begin: + self.workflowoper.reset(workflow_id) # 查询工作流数据 workflow = self.workflowoper.get(workflow_id) @@ -68,77 +187,18 @@ class WorkflowChain(ChainBase): logger.info(f"开始处理 {workflow.name},共 {len(workflow.actions)} 个动作 ...") self.workflowoper.start(workflow_id) - # 启用上下文 - if not from_begin and workflow.current_action: - _init_action = workflow.current_action - context = ActionContext(**workflow.context) + # 执行工作流 + executor = WorkflowExecutor(workflow) + executor.execute() + + if executor.success: + logger.info(f"工作流 {workflow.name} 执行失败:{executor.errmsg}") + self.workflowoper.fail(workflow_id, result=executor.errmsg) + return False, executor.errmsg else: - context = ActionContext() - - if from_begin: - current_action = None - else: - current_action = _init_action - - # 循环执行 - while next_actions := __get_next_action(workflow, current_action): - if global_vars.is_system_stopped: - break - if not next_actions: - break - # 获取下一个动作 - if len(next_actions) > 1: - # 多个下一步动作 - current_action = ",".join([act.id for act in next_actions]) - # 动作名称 - current_acttion_names = "、".join([act.name for act in next_actions]) - # 开始计时 - start_time = datetime.now() - # 多个下一步动作,多线程并发执行,等待结果 - executor = ThreadPoolExecutor(max_workers=len(next_actions)) - all_task = [] - for action in next_actions: - task = executor.submit(self.workflowmanager.excute, action, context) - all_task.append(task) - # 等待结果 - success_count = 0 - for future in as_completed(all_task): - state, context = future.result() - if state: - success_count += 1 - # 计算耗时 - end_time = datetime.now() - # 记录步骤 - self.workflowoper.step(workflow_id, - action=current_action, - context=context.dict()) - if success_count < len(next_actions): - logger.error(f"动作 {current_acttion_names} 未全部成功,工作流失败") - self.workflowoper.fail(workflow_id, result=f"动作 {current_acttion_names} 未全部成功") - return False, f"动作 {current_acttion_names} 未全部成功" - else: - logger.info(f"动作 {current_acttion_names} 执行完成,耗时:{(end_time - start_time).seconds} 秒") - else: - # 单个下一步动作 - action = next_actions[0] - current_action = action.id - # 开始计时 - start_time = datetime.now() - # 执行动作 - state, context = self.workflowmanager.excute(action, context) - # 计算耗时 - end_time = datetime.now() - # 记录步骤 - self.workflowoper.step(workflow_id, action=current_action, context=context.dict()) - if not state: - logger.error(f"动作 {action.name} 执行失败,工作流失败") - self.workflowoper.fail(workflow_id, result=f"动作 {action.name} 执行失败") - return False, f"动作 {action.name} 执行失败" - logger.info(f"动作 {action.name} 执行完成,耗时:{(end_time - start_time).seconds} 秒") - - logger.info(f"工作流 {workflow.name} 执行完成") - self.workflowoper.success(workflow_id) - return True, "" + logger.info(f"工作流 {workflow.name} 执行成功") + self.workflowoper.success(workflow_id) + return True, "" def get_workflows(self) -> List[Workflow]: """ diff --git a/app/core/workflow.py b/app/core/workflow.py index 363730a3..7fa57fb6 100644 --- a/app/core/workflow.py +++ b/app/core/workflow.py @@ -57,24 +57,30 @@ class WorkFlowManager(metaclass=Singleton): """ if not context: context = ActionContext() - if action.id in self._actions: + if action.type in self._actions: # 实例化 - action_obj = self._actions[action.id]() + action_obj = self._actions[action.type]() # 执行 logger.info(f"执行动作: {action.id} - {action.name}") result_context = action_obj.execute(action.params, context) - logger.info(f"{action.name} 执行结果: {action_obj.success}") + if action_obj.success: + logger.info(f"{action.name} 执行成功") + else: + logger.error(f"{action.name} 执行失败") if action.loop and action.loop_interval: while not action_obj.done: # 等待 - logger.info(f"{action.name} 等待 {action.loop_interval} 秒后继续执行") + logger.info(f"{action.name} 等待 {action.loop_interval} 秒后继续执行 ...") sleep(action.loop_interval) # 执行 logger.info(f"继续执行动作: {action.id} - {action.name}") result_context = action_obj.execute(action.params, result_context) - logger.info(f"{action.name} 执行结果: {action_obj.success}") + if action_obj.success: + logger.info(f"{action.name} 执行成功") + else: + logger.error(f"{action.name} 执行失败") logger.info(f"{action.name} 执行完成") return action_obj.success, result_context else: - logger.error(f"未找到动作: {action.id} - {action.name}") + logger.error(f"未找到动作: {action.type} - {action.name}") return False, context diff --git a/app/db/models/workflow.py b/app/db/models/workflow.py index 795ea9d8..ab157454 100644 --- a/app/db/models/workflow.py +++ b/app/db/models/workflow.py @@ -66,7 +66,6 @@ class Workflow(Base): db.query(Workflow).filter(Workflow.id == wid).update({ "state": 'F', "result": result, - "run_count": Workflow.run_count + 1, "last_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) return True @@ -84,6 +83,19 @@ class Workflow(Base): @staticmethod @db_update - def update_current_action(db, wid: int, action: str, context: dict): - db.query(Workflow).filter(Workflow.id == wid).update({"current_action": action, "context": context}) + def reset(db, wid: int): + db.query(Workflow).filter(Workflow.id == wid).update({ + "state": 'W', + "result": None, + "current_action": None, + }) + return True + + @staticmethod + @db_update + def update_current_action(db, wid: int, action_id: int, context: dict): + db.query(Workflow).filter(Workflow.id == wid).update({ + "current_action": f"{Workflow.current_action},{action_id}" if Workflow.current_action else action_id, + "context": context + }) return True diff --git a/app/db/workflow_oper.py b/app/db/workflow_oper.py index e73f9328..1827f052 100644 --- a/app/db/workflow_oper.py +++ b/app/db/workflow_oper.py @@ -55,8 +55,14 @@ class WorkflowOper(DbOper): """ return Workflow.fail(self._db, wid, result) - def step(self, wid: int, action: str, context: dict) -> bool: + def step(self, wid: int, action_id: int, context: dict) -> bool: """ 步进 """ - return Workflow.update_current_action(self._db, wid, action, context) + return Workflow.update_current_action(self._db, wid, action_id, context) + + def reset(self, wid: int) -> bool: + """ + 重置 + """ + return Workflow.reset(self._db, wid) diff --git a/app/schemas/workflow.py b/app/schemas/workflow.py index 2dd34073..a90a7c49 100644 --- a/app/schemas/workflow.py +++ b/app/schemas/workflow.py @@ -43,13 +43,13 @@ class Action(BaseModel): """ 动作信息 """ - id: Optional[str] = Field(None, description="动作ID (类名)") + id: Optional[int] = Field(None, description="动作ID") + type: Optional[str] = Field(None, description="动作类型 (类名)") name: Optional[str] = Field(None, description="动作名称") description: Optional[str] = Field(None, description="动作描述") loop: Optional[bool] = Field(False, description="是否需要循环") loop_interval: Optional[int] = Field(0, description="循环间隔 (秒)") params: Optional[ActionParams] = Field({}, description="参数") - type: Optional[str] = Field(None, description="类型") label: Optional[str] = Field(None, description="标签") position: Optional[dict] = Field({}, description="位置")