From 7abaf70bb8a70691bffff3e447516af764ef35a5 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Thu, 24 Jul 2025 09:54:46 +0800 Subject: [PATCH] fix workflow --- app/api/endpoints/workflow.py | 9 +++++++-- app/chain/workflow.py | 14 ++++++++++++++ app/core/workflow.py | 19 +++++++++++++------ app/db/models/workflow.py | 16 +++++++++++++++- app/db/workflow_oper.py | 6 ++++++ app/scheduler.py | 2 +- 6 files changed, 56 insertions(+), 10 deletions(-) diff --git a/app/api/endpoints/workflow.py b/app/api/endpoints/workflow.py index 08282d28..b236e651 100644 --- a/app/api/endpoints/workflow.py +++ b/app/api/endpoints/workflow.py @@ -39,7 +39,6 @@ def create_workflow(workflow: schemas.Workflow, """ 创建工作流 """ - from app.db.workflow_oper import WorkflowOper if workflow.name and WorkflowOper(db).get_by_name(workflow.name): return schemas.Response(success=False, message="已存在相同名称的工作流") if not workflow.add_time: @@ -280,6 +279,12 @@ def update_workflow(workflow: schemas.Workflow, if not wf.trigger_type: workflow.trigger_type = "timer" wf.update(db, workflow.dict()) + # 更新后的工作流对象 + updated_workflow = wf.get(workflow.id) + # 更新定时任务 + Scheduler().update_workflow_job(updated_workflow) + # 更新事件注册 + WorkFlowManager().update_workflow_event(updated_workflow) return schemas.Response(success=True, message="更新成功") @@ -298,7 +303,7 @@ def delete_workflow(workflow_id: int, Scheduler().remove_workflow_job(workflow) else: # 事件触发:从事件触发器中移除 - WorkFlowManager().register_workflow_event(workflow_id, workflow.event_type) + WorkFlowManager().remove_workflow_event(workflow_id, workflow.event_type) # 删除工作流 Workflow.delete(db, workflow_id) # 删除缓存 diff --git a/app/chain/workflow.py b/app/chain/workflow.py index bf2e2d1d..5c5836da 100644 --- a/app/chain/workflow.py +++ b/app/chain/workflow.py @@ -259,3 +259,17 @@ class WorkflowChain(ChainBase): 获取工作流列表 """ return WorkflowOper().list_enabled() + + @staticmethod + def get_timer_workflows() -> List[Workflow]: + """ + 获取定时触发的工作流列表 + """ + return WorkflowOper().get_timer_triggered_workflows() + + @staticmethod + def get_event_workflows() -> List[Workflow]: + """ + 获取事件触发的工作流列表 + """ + return WorkflowOper().get_event_triggered_workflows() diff --git a/app/core/workflow.py b/app/core/workflow.py index 54473f42..48be9b29 100644 --- a/app/core/workflow.py +++ b/app/core/workflow.py @@ -5,6 +5,7 @@ from typing import List, Tuple from app.core.config import global_vars from app.core.event import eventmanager, Event +from app.db.models import Workflow from app.db.workflow_oper import WorkflowOper from app.helper.module import ModuleHelper from app.log import logger @@ -121,6 +122,17 @@ class WorkFlowManager(metaclass=Singleton): } for key, action in self._actions.items() ] + def update_workflow_event(self, workflow: Workflow): + """ + 更新工作流事件触发器 + """ + # 确保先移除旧的事件监听器 + self.remove_workflow_event(workflow_id=workflow.id, event_type_str=workflow.event_type) + # 如果工作流是事件触发类型且未被禁用 + if workflow.trigger_type == "event" and workflow.state != 'P': + # 注册事件触发器 + self.register_workflow_event(workflow.id, workflow.event_type) + def load_workflow_events(self, workflow_id: Optional[int] = None): """ 加载工作流触发事件 @@ -134,12 +146,7 @@ class WorkFlowManager(metaclass=Singleton): workflows = WorkflowOper().get_event_triggered_workflows() try: for workflow in workflows: - # 确保先移除旧的事件监听器 - self.remove_workflow_event(workflow_id=workflow.id, event_type_str=workflow.event_type) - # 如果工作流是事件触发类型且未被禁用 - if workflow.trigger_type == "event" and workflow.state != 'P': - # 注册事件触发器 - self.register_workflow_event(workflow.id, workflow.event_type) + self.update_workflow_event(workflow) except Exception as e: logger.error(f"加载事件触发工作流失败: {e}") diff --git a/app/db/models/workflow.py b/app/db/models/workflow.py index 8d6668a7..7770c26f 100644 --- a/app/db/models/workflow.py +++ b/app/db/models/workflow.py @@ -1,7 +1,7 @@ from datetime import datetime from typing import Optional -from sqlalchemy import Column, Integer, JSON, Sequence, String, and_ +from sqlalchemy import Column, Integer, JSON, Sequence, String, and_, or_ from app.db import Base, db_query, db_update @@ -53,6 +53,20 @@ class Workflow(Base): def get_enabled_workflows(db): return db.query(Workflow).filter(Workflow.state != 'P').all() + @staticmethod + @db_query + def get_timer_triggered_workflows(db): + """获取定时触发的工作流""" + return db.query(Workflow).filter( + and_( + or_( + Workflow.trigger_type == 'timer', + not Workflow.trigger_type + ), + Workflow.state != 'P' + ) + ).all() + @staticmethod @db_query def get_event_triggered_workflows(db): diff --git a/app/db/workflow_oper.py b/app/db/workflow_oper.py index 5fa71cbc..d71b99aa 100644 --- a/app/db/workflow_oper.py +++ b/app/db/workflow_oper.py @@ -37,6 +37,12 @@ class WorkflowOper(DbOper): """ return Workflow.get_enabled_workflows(self._db) + def get_timer_triggered_workflows(self) -> List[Workflow]: + """ + 获取定时触发的工作流列表 + """ + return Workflow.get_timer_triggered_workflows(self._db) + def get_event_triggered_workflows(self) -> List[Workflow]: """ 获取事件触发的工作流列表 diff --git a/app/scheduler.py b/app/scheduler.py index 7f3d299e..aedbd42a 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -432,7 +432,7 @@ class Scheduler(metaclass=Singleton): """ 初始化工作流定时服务 """ - for workflow in WorkflowChain().get_workflows() or []: + for workflow in WorkflowChain().get_timer_workflows() or []: self.update_workflow_job(workflow) def remove_workflow_job(self, workflow: Workflow):