fix workflow

This commit is contained in:
jxxghp
2025-07-24 09:54:46 +08:00
parent 232fe4d15e
commit 7abaf70bb8
6 changed files with 56 additions and 10 deletions

View File

@@ -39,7 +39,6 @@ def create_workflow(workflow: schemas.Workflow,
"""
创建工作流
"""
from app.db.workflow_oper import WorkflowOper
if workflow.name and WorkflowOper(db).get_by_name(workflow.name):
return schemas.Response(success=False, message="已存在相同名称的工作流")
if not workflow.add_time:
@@ -280,6 +279,12 @@ def update_workflow(workflow: schemas.Workflow,
if not wf.trigger_type:
workflow.trigger_type = "timer"
wf.update(db, workflow.dict())
# 更新后的工作流对象
updated_workflow = wf.get(workflow.id)
# 更新定时任务
Scheduler().update_workflow_job(updated_workflow)
# 更新事件注册
WorkFlowManager().update_workflow_event(updated_workflow)
return schemas.Response(success=True, message="更新成功")
@@ -298,7 +303,7 @@ def delete_workflow(workflow_id: int,
Scheduler().remove_workflow_job(workflow)
else:
# 事件触发:从事件触发器中移除
WorkFlowManager().register_workflow_event(workflow_id, workflow.event_type)
WorkFlowManager().remove_workflow_event(workflow_id, workflow.event_type)
# 删除工作流
Workflow.delete(db, workflow_id)
# 删除缓存

View File

@@ -259,3 +259,17 @@ class WorkflowChain(ChainBase):
获取工作流列表
"""
return WorkflowOper().list_enabled()
@staticmethod
def get_timer_workflows() -> List[Workflow]:
"""
获取定时触发的工作流列表
"""
return WorkflowOper().get_timer_triggered_workflows()
@staticmethod
def get_event_workflows() -> List[Workflow]:
"""
获取事件触发的工作流列表
"""
return WorkflowOper().get_event_triggered_workflows()

View File

@@ -5,6 +5,7 @@ from typing import List, Tuple
from app.core.config import global_vars
from app.core.event import eventmanager, Event
from app.db.models import Workflow
from app.db.workflow_oper import WorkflowOper
from app.helper.module import ModuleHelper
from app.log import logger
@@ -121,6 +122,17 @@ class WorkFlowManager(metaclass=Singleton):
} for key, action in self._actions.items()
]
def update_workflow_event(self, workflow: Workflow):
"""
更新工作流事件触发器
"""
# 确保先移除旧的事件监听器
self.remove_workflow_event(workflow_id=workflow.id, event_type_str=workflow.event_type)
# 如果工作流是事件触发类型且未被禁用
if workflow.trigger_type == "event" and workflow.state != 'P':
# 注册事件触发器
self.register_workflow_event(workflow.id, workflow.event_type)
def load_workflow_events(self, workflow_id: Optional[int] = None):
"""
加载工作流触发事件
@@ -134,12 +146,7 @@ class WorkFlowManager(metaclass=Singleton):
workflows = WorkflowOper().get_event_triggered_workflows()
try:
for workflow in workflows:
# 确保先移除旧的事件监听器
self.remove_workflow_event(workflow_id=workflow.id, event_type_str=workflow.event_type)
# 如果工作流是事件触发类型且未被禁用
if workflow.trigger_type == "event" and workflow.state != 'P':
# 注册事件触发器
self.register_workflow_event(workflow.id, workflow.event_type)
self.update_workflow_event(workflow)
except Exception as e:
logger.error(f"加载事件触发工作流失败: {e}")

View File

@@ -1,7 +1,7 @@
from datetime import datetime
from typing import Optional
from sqlalchemy import Column, Integer, JSON, Sequence, String, and_
from sqlalchemy import Column, Integer, JSON, Sequence, String, and_, or_
from app.db import Base, db_query, db_update
@@ -53,6 +53,20 @@ class Workflow(Base):
def get_enabled_workflows(db):
return db.query(Workflow).filter(Workflow.state != 'P').all()
@staticmethod
@db_query
def get_timer_triggered_workflows(db):
"""获取定时触发的工作流"""
return db.query(Workflow).filter(
and_(
or_(
Workflow.trigger_type == 'timer',
not Workflow.trigger_type
),
Workflow.state != 'P'
)
).all()
@staticmethod
@db_query
def get_event_triggered_workflows(db):

View File

@@ -37,6 +37,12 @@ class WorkflowOper(DbOper):
"""
return Workflow.get_enabled_workflows(self._db)
def get_timer_triggered_workflows(self) -> List[Workflow]:
"""
获取定时触发的工作流列表
"""
return Workflow.get_timer_triggered_workflows(self._db)
def get_event_triggered_workflows(self) -> List[Workflow]:
"""
获取事件触发的工作流列表

View File

@@ -432,7 +432,7 @@ class Scheduler(metaclass=Singleton):
"""
初始化工作流定时服务
"""
for workflow in WorkflowChain().get_workflows() or []:
for workflow in WorkflowChain().get_timer_workflows() or []:
self.update_workflow_job(workflow)
def remove_workflow_job(self, workflow: Workflow):