diff --git a/app/chain/workflow.py b/app/chain/workflow.py index 5307ef98..0e49f140 100644 --- a/app/chain/workflow.py +++ b/app/chain/workflow.py @@ -1,10 +1,14 @@ +from datetime import datetime from typing import List +from concurrent.futures import ThreadPoolExecutor, as_completed + from app.chain import ChainBase +from app.core.config import global_vars from app.core.workflow import WorkFlowManager from app.db.workflow_oper import WorkflowOper from app.log import logger -from app.schemas import Workflow, ActionContext, Action +from app.schemas import Workflow, ActionContext, Action, ActionFlow class WorkflowChain(ChainBase): @@ -23,6 +27,31 @@ class WorkflowChain(ChainBase): :param workflow_id: 工作流ID :param from_begin: 是否从头开始,默认为True """ + + def __get_next_action(_workflow: Workflow, _action: str) -> List[Action]: + """ + 获取下一个动作 + """ + if not _action: + # 获取起点动作 + actions = [] + source = [f.source for f in _workflow.flows] + target = [f.target for f in _workflow.flows] + for act in _workflow.actions: + if act.id not in target and act.id in source: + actions.append(Action(**act)) + return actions + else: + if _action == _workflow.current_action: + # 返回当前动作 + action_ids = _action.split(',') + return [Action(**act) for act in _workflow.actions if act.id in action_ids] + else: + # 获取下一个动作 + flows = [ActionFlow(**f) for f in _workflow.flows if f.source == _action] + return [Action(**act) for act in _workflow.actions if act.id in [f.target for f in flows]] + + # 查询工作流数据 workflow = self.workflowoper.get(workflow_id) if not workflow: logger.warn(f"工作流 {workflow_id} 不存在") @@ -30,23 +59,79 @@ class WorkflowChain(ChainBase): if not workflow.actions: logger.warn(f"工作流 {workflow.name} 无动作") return False + if not workflow.flows: + logger.warn(f"工作流 {workflow.name} 无流程") + return False + logger.info(f"开始处理 {workflow.name},共 {len(workflow.actions)} 个动作 ...") + # 启用上下文 if not from_begin and workflow.current_action: context = ActionContext(**workflow.context) else: context = ActionContext() - self.workflowoper.start(workflow_id) - for act in workflow.actions: - if not from_begin and act['id'] != workflow.current_action: - continue - action = Action(**act) - state, context = self.workflowmanager.excute(action, context) - self.workflowoper.step(workflow_id, action=action.id, context=context.dict()) - if not state: - logger.error(f"动作 {action.name} 执行失败,工作流失败") - self.workflowoper.fail(workflow_id, result=f"动作 {action.name} 执行失败") - return False + + if from_begin: + current_action = None + else: + current_action = workflow.current_action + + # 循环执行 + while next_actions := __get_next_action(workflow, current_action): + if global_vars.is_system_stopped: + break + if not next_actions: + break + # 获取下一个动作 + if len(next_actions) > 1: + # 多个下一步动作 + current_action = ",".join([act.id for act in next_actions]) + # 动作名称 + current_acttion_names = "、".join([act.name for act in next_actions]) + # 开始计时 + start_time = datetime.now() + # 多个下一步动作,多线程并发执行,等待结果 + executor = ThreadPoolExecutor(max_workers=len(next_actions)) + all_task = [] + for action in next_actions: + task = executor.submit(self.workflowmanager.excute, action, context) + all_task.append(task) + # 等待结果 + success_count = 0 + for future in as_completed(all_task): + state, context = future.result() + if state: + success_count += 1 + # 计算耗时 + end_time = datetime.now() + # 记录步骤 + self.workflowoper.step(workflow_id, + action=current_action, + context=context.dict()) + if success_count < len(next_actions): + logger.error(f"动作 {current_acttion_names} 未全部成功,工作流失败") + self.workflowoper.fail(workflow_id, result=f"动作 {current_acttion_names} 执行失败") + return False + else: + logger.info(f"动作 {current_acttion_names} 执行完成,耗时:{(end_time - start_time).seconds} 秒") + else: + # 单个下一步动作 + action = next_actions[0] + current_action = action.id + # 开始计时 + start_time = datetime.now() + # 执行动作 + state, context = self.workflowmanager.excute(action, context) + # 计算耗时 + end_time = datetime.now() + # 记录步骤 + self.workflowoper.step(workflow_id, action=current_action, context=context.dict()) + if not state: + logger.error(f"动作 {action.name} 执行失败,工作流失败") + self.workflowoper.fail(workflow_id, result=f"动作 {action.name} 执行失败") + return False + logger.info(f"动作 {action.name} 执行完成,耗时:{(end_time - start_time).seconds} 秒") + logger.info(f"工作流 {workflow.name} 执行完成") self.workflowoper.success(workflow_id) return True diff --git a/app/db/models/workflow.py b/app/db/models/workflow.py index a85d9c7c..795ea9d8 100644 --- a/app/db/models/workflow.py +++ b/app/db/models/workflow.py @@ -27,6 +27,8 @@ class Workflow(Base): run_count = Column(Integer, default=0) # 任务列表 actions = Column(JSON, default=list) + # 任务流 + flows = Column(JSON, default=list) # 执行上下文 context = Column(JSON, default=dict) # 创建时间 diff --git a/app/schemas/workflow.py b/app/schemas/workflow.py index c58ebb85..1dfaaae3 100644 --- a/app/schemas/workflow.py +++ b/app/schemas/workflow.py @@ -24,6 +24,7 @@ class Workflow(BaseModel): result: Optional[str] = Field(None, description="任务执行结果") run_count: Optional[int] = Field(0, description="已执行次数") actions: Optional[list] = Field([], description="任务列表") + flows: Optional[list] = Field([], description="任务流") add_time: Optional[str] = Field(None, description="创建时间") last_time: Optional[str] = Field(None, description="最后执行时间") @@ -63,3 +64,12 @@ class ActionContext(BaseModel): subscribes: Optional[List[Subscribe]] = Field([], description="订阅列表") messages: Optional[List[Notification]] = Field([], description="消息列表") events: Optional[List[Event]] = Field([], description="事件列表") + + +class ActionFlow(BaseModel): + """ + 工作流流程 + """ + id: Optional[str] = Field(None, description="流程ID") + source: Optional[str] = Field(None, description="源动作") + target: Optional[str] = Field(None, description="目标动作") diff --git a/database/versions/610bb05ddeef_2_1_2.py b/database/versions/610bb05ddeef_2_1_2.py new file mode 100644 index 00000000..a38338b9 --- /dev/null +++ b/database/versions/610bb05ddeef_2_1_2.py @@ -0,0 +1,29 @@ +"""2.1.2 + +Revision ID: 610bb05ddeef +Revises: 279a949d81b6 +Create Date: 2025-02-24 07:52:00.042837 + +""" +import contextlib + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import sqlite + +# revision identifiers, used by Alembic. +revision = '610bb05ddeef' +down_revision = '279a949d81b6' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with contextlib.suppress(Exception): + op.add_column('workflow', sa.Column('flows', sa.JSON(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade() -> None: + pass