feat:工作流支持事件触发

This commit is contained in:
jxxghp
2025-07-22 20:23:53 +08:00
parent 2f1e55fa1e
commit e39a130306
9 changed files with 311 additions and 18 deletions

View File

@@ -14,8 +14,10 @@ from app.db import get_db
from app.db.models import Workflow
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_user
from app.db.workflow_oper import WorkflowOper
from app.helper.workflow import WorkflowHelper
from app.scheduler import Scheduler
from app.schemas.types import EventType
router = APIRouter()
@@ -44,8 +46,9 @@ def create_workflow(workflow: schemas.Workflow,
workflow.add_time = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")
if not workflow.state:
workflow.state = "P"
from app.db.models.workflow import Workflow as WorkflowModel
WorkflowModel(**workflow.dict()).create(db)
if not workflow.trigger_type:
workflow.trigger_type = "timer"
Workflow(**workflow.dict()).create(db)
return schemas.Response(success=True, message="创建工作流成功")
@@ -65,6 +68,17 @@ def list_actions(_: schemas.TokenPayload = Depends(get_current_active_user)) ->
return WorkFlowManager().list_actions()
@router.get("/event_types", summary="获取所有事件类型", response_model=List[dict])
def get_event_types(_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
获取所有事件类型
"""
return [{
"name": event_type.name,
"value": event_type.value
} for event_type in EventType]
@router.post("/share", summary="分享工作流", response_model=schemas.Response)
def workflow_share(
workflow: schemas.WorkflowShare,
@@ -125,6 +139,9 @@ def workflow_fork(
"name": workflow.name,
"description": workflow.description,
"timer": workflow.timer,
"trigger_type": workflow.trigger_type or "timer",
"event_type": workflow.event_type,
"event_conditions": json.loads(workflow.event_conditions or "{}") if workflow.event_conditions else {},
"actions": actions,
"flows": flows,
"context": context,
@@ -182,8 +199,12 @@ def start_workflow(workflow_id: int,
workflow = WorkflowOper(db).get(workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
# 添加定时任务
Scheduler().update_workflow_job(workflow)
if not workflow.event_type or workflow.event_type == "timer":
# 添加定时任务
Scheduler().update_workflow_job(workflow)
else:
# 事件触发:添加到事件触发器
WorkFlowManager().load_workflow_events(workflow_id)
# 更新状态
workflow.update_state(db, workflow_id, "W")
return schemas.Response(success=True)
@@ -200,8 +221,13 @@ def pause_workflow(workflow_id: int,
workflow = WorkflowOper(db).get(workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
# 删除定时任务
Scheduler().remove_workflow_job(workflow)
# 根据触发类型进行不同处理
if workflow.trigger_type == "timer":
# 定时触发:移除定时任务
Scheduler().remove_workflow_job(workflow)
elif workflow.trigger_type == "event":
# 事件触发:从事件触发器中移除
WorkFlowManager().remove_workflow_event(workflow_id, workflow.event_type)
# 停止工作流
global_vars.stop_workflow(workflow_id)
# 更新状态
@@ -247,12 +273,13 @@ def update_workflow(workflow: schemas.Workflow,
"""
更新工作流
"""
from app.db.workflow_oper import WorkflowOper
if not workflow.id:
return schemas.Response(success=False, message="工作流ID不能为空")
wf = WorkflowOper(db).get(workflow.id)
if not wf:
return schemas.Response(success=False, message="工作流不存在")
if not wf.event_type:
workflow.event_type = "timer"
wf.update(db, workflow.dict())
return schemas.Response(success=True, message="更新成功")
@@ -264,15 +291,17 @@ def delete_workflow(workflow_id: int,
"""
删除工作流
"""
from app.db.workflow_oper import WorkflowOper
workflow = WorkflowOper(db).get(workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
# 删除定时任务
Scheduler().remove_workflow_job(workflow)
if not workflow.event_type or workflow.event_type == "timer":
# 定时触发:删除定时任务
Scheduler().remove_workflow_job(workflow)
else:
# 事件触发:从事件触发器中移除
WorkFlowManager().register_workflow_event(workflow_id, workflow.event_type)
# 删除工作流
from app.db.models.workflow import Workflow as WorkflowModel
WorkflowModel.delete(db, workflow_id)
Workflow.delete(db, workflow_id)
# 删除缓存
SystemConfigOper().delete(f"WorkflowCache-{workflow_id}")
return schemas.Response(success=True, message="删除成功")

View File

@@ -10,11 +10,13 @@ from pydantic.fields import Callable
from app.chain import ChainBase
from app.core.config import global_vars
from app.core.event import Event, eventmanager
from app.core.workflow import WorkFlowManager
from app.db.models import Workflow
from app.db.workflow_oper import WorkflowOper
from app.log import logger
from app.schemas import ActionContext, ActionFlow, Action, ActionExecution
from app.schemas.types import EventType
class WorkflowExecutor:
@@ -81,6 +83,16 @@ class WorkflowExecutor:
if self.indegree[action_id] == 0:
self.queue.append(action_id)
@eventmanager.register(EventType.WorkflowExecute)
def event_execute(self, event: Event):
"""
事件触发工作流执行
"""
workflow_id = event.event_data.get('workflow_id')
if not workflow_id:
return
WorkflowChain.process(workflow_id, from_begin=False)
def execute(self):
"""
执行工作流
@@ -225,7 +237,7 @@ class WorkflowChain(ChainBase):
logger.warn(f"工作流 {workflow.name} 无流程")
return False, "工作流无流程"
logger.info(f"开始处理 {workflow.name},共 {len(workflow.actions)} 个动作 ...")
logger.info(f"开始执行工作流 {workflow.name},共 {len(workflow.actions)} 个动作 ...")
workflowoper.start(workflow_id)
# 执行工作流

View File

@@ -1,10 +1,15 @@
import threading
from time import sleep
from typing import Dict, Any, Tuple, List
from typing import Dict, Any, Optional
from typing import List, Tuple
from app.core.config import global_vars
from app.core.event import eventmanager, Event
from app.db.workflow_oper import WorkflowOper
from app.helper.module import ModuleHelper
from app.log import logger
from app.schemas import Action, ActionContext
from app.schemas import ActionContext, Action
from app.schemas.types import EventType
from app.utils.singleton import Singleton
@@ -15,7 +20,9 @@ class WorkFlowManager(metaclass=Singleton):
def __init__(self):
# 所有动作定义
self._lock = threading.Lock()
self._actions: Dict[str, Any] = {}
self._event_workflows: Dict[str, List[int]] = {}
self.init()
def init(self):
@@ -48,11 +55,15 @@ class WorkFlowManager(metaclass=Singleton):
except Exception as err:
logger.error(f"加载动作失败: {action.__name__} - {err}")
# 加载工作流事件触发器
self.load_workflow_events()
def stop(self):
"""
停止
"""
pass
self._actions = {}
self._event_workflows = {}
def excute(self, workflow_id: int, action: Action,
context: ActionContext = None) -> Tuple[bool, str, ActionContext]:
@@ -109,3 +120,175 @@ class WorkFlowManager(metaclass=Singleton):
}
} for key, action in self._actions.items()
]
def load_workflow_events(self, workflow_id: Optional[int] = None):
"""
加载工作流触发事件
"""
workflows = []
if workflow_id:
workflow = WorkflowOper().get(workflow_id)
if workflow:
workflows = [workflow]
else:
workflows = WorkflowOper().get_event_triggered_workflows()
try:
with self._lock:
for workflow in workflows:
# 确保先移除旧的事件监听器
self.remove_workflow_event(workflow_id=workflow.id, event_type_str=workflow.event_type)
# 如果工作流是事件触发类型且未被禁用
if workflow.trigger_type == "event" and workflow.state != 'P':
# 注册事件触发器
self.register_workflow_event(workflow.id, workflow.event_type)
except Exception as e:
logger.error(f"加载事件触发工作流失败: {e}")
def register_workflow_event(self, workflow_id: int, event_type_str: str):
"""
注册工作流事件触发器
"""
try:
event_type = EventType(event_type_str)
except ValueError:
logger.error(f"无效的事件类型: {event_type_str}")
return
if event_type in EventType:
with self._lock:
# 确保先移除旧的事件监听器
self.remove_workflow_event(workflow_id, event_type.value)
# 添加新的事件监听器
eventmanager.add_event_listener(event_type, self._handle_event)
# 记录工作流事件触发器
if event_type.value not in self._event_workflows:
self._event_workflows[event_type.value] = []
self._event_workflows[event_type.value].append(workflow_id)
logger.info(f"已注册工作流 {workflow_id} 事件触发器: {event_type.value}")
def remove_workflow_event(self, workflow_id: int, event_type_str: str):
"""
移除工作流事件触发器
"""
try:
event_type = EventType(event_type_str)
except ValueError:
logger.error(f"无效的事件类型: {event_type_str}")
return
if event_type in EventType:
with self._lock:
eventmanager.remove_event_listener(event_type, self._handle_event)
if event_type.value in self._event_workflows:
if workflow_id in self._event_workflows[event_type.value]:
self._event_workflows[event_type.value].remove(workflow_id)
if not self._event_workflows[event_type.value]:
del self._event_workflows[event_type.value]
logger.info(f"已移除工作流 {workflow_id} 事件触发器")
def _handle_event(self, event: Event):
"""
处理事件,触发相应的工作流
"""
try:
event_type_str = str(event.event_type.value)
with self._lock:
if event_type_str not in self._event_workflows:
return
workflow_ids = self._event_workflows[event_type_str].copy()
for workflow_id in workflow_ids:
self._trigger_workflow(workflow_id, event)
except Exception as e:
logger.error(f"处理工作流事件失败: {e}")
def _trigger_workflow(self, workflow_id: int, event: Event):
"""
触发工作流执行
"""
try:
# 检查工作流是否存在且启用
workflow = WorkflowOper().get(workflow_id)
if not workflow or workflow.state == 'P':
return
# 检查事件条件
if not self._check_event_conditions(workflow, event):
logger.debug(f"工作流 {workflow.name} 事件条件不匹配,跳过执行")
return
# 检查工作流是否正在运行
if workflow.state == 'R':
logger.warning(f"工作流 {workflow.name} 正在运行中,跳过重复触发")
return
logger.info(f"事件 {event.event_type.value} 触发工作流: {workflow.name}")
# 发送工作流执行事件以启动工作流
eventmanager.send_event(EventType.WorkflowExecute, {
"workflow_id": workflow_id,
})
except Exception as e:
logger.error(f"触发工作流 {workflow_id} 失败: {e}")
def _check_event_conditions(self, workflow, event: Event) -> bool:
"""
检查事件是否满足工作流的触发条件
"""
if not workflow.event_conditions:
return True
conditions = workflow.event_conditions
event_data = event.event_data or {}
# 检查字段匹配条件
for field, expected_value in conditions.items():
if field not in event_data:
return False
actual_value = event_data[field]
# 支持多种条件匹配方式
if isinstance(expected_value, dict):
# 复杂条件匹配
if not self._check_complex_condition(actual_value, expected_value):
return False
else:
# 简单值匹配
if actual_value != expected_value:
return False
return True
@staticmethod
def _check_complex_condition(actual_value: any, condition: dict) -> bool:
"""
检查复杂条件匹配
支持的操作符equals, not_equals, contains, not_contains, in, not_in, regex
"""
for operator, expected_value in condition.items():
if operator == "equals":
if actual_value != expected_value:
return False
elif operator == "not_equals":
if actual_value == expected_value:
return False
elif operator == "contains":
if expected_value not in str(actual_value):
return False
elif operator == "not_contains":
if expected_value in str(actual_value):
return False
elif operator == "in":
if actual_value not in expected_value:
return False
elif operator == "not_in":
if actual_value in expected_value:
return False
elif operator == "regex":
import re
if not re.search(expected_value, str(actual_value)):
return False
return True
def get_event_workflows(self) -> dict:
"""
获取所有事件触发的工作流
"""
with self._lock:
return self._event_workflows.copy()

View File

@@ -18,6 +18,12 @@ class Workflow(Base):
description = Column(String)
# 定时器
timer = Column(String)
# 触发类型timer-定时触发 event-事件触发 manual-手动触发
trigger_type = Column(String, default='timer')
# 事件类型当trigger_type为event时使用
event_type = Column(String)
# 事件条件JSON格式用于过滤事件
event_conditions = Column(JSON, default=dict)
# 状态W-等待 R-运行中 P-暂停 S-成功 F-失败
state = Column(String, nullable=False, index=True, default='W')
# 已执行动作(,分隔)
@@ -47,6 +53,17 @@ class Workflow(Base):
def get_enabled_workflows(db):
return db.query(Workflow).filter(Workflow.state != 'P').all()
@staticmethod
@db_query
def get_event_triggered_workflows(db):
"""获取事件触发的工作流"""
return db.query(Workflow).filter(
and_(
Workflow.trigger_type == 'event',
Workflow.state != 'P'
)
).all()
@staticmethod
@db_query
def get_by_name(db, name: str):

View File

@@ -37,6 +37,12 @@ class WorkflowOper(DbOper):
"""
return Workflow.get_enabled_workflows(self._db)
def get_event_triggered_workflows(self) -> List[Workflow]:
"""
获取事件触发的工作流列表
"""
return Workflow.get_event_triggered_workflows(self._db)
def get_by_name(self, name: str) -> Workflow:
"""
按名称获取工作流

View File

@@ -65,6 +65,8 @@ class EventType(Enum):
ConfigChanged = "config.updated"
# 消息交互动作
MessageAction = "message.action"
# 执行工作流
WorkflowExecute = "workflow.execute"
# 同步链式事件

View File

@@ -17,6 +17,9 @@ class Workflow(BaseModel):
name: Optional[str] = Field(default=None, description="工作流名称")
description: Optional[str] = Field(default=None, description="工作流描述")
timer: Optional[str] = Field(default=None, description="定时器")
trigger_type: Optional[str] = Field(default='timer', description="触发类型timer-定时触发 event-事件触发 manual-手动触发")
event_type: Optional[str] = Field(default=None, description="事件类型当trigger_type为event时使用")
event_conditions: Optional[dict] = Field(default={}, description="事件条件JSON格式用于过滤事件")
state: Optional[str] = Field(default=None, description="状态")
current_action: Optional[str] = Field(default=None, description="已执行动作")
result: Optional[str] = Field(default=None, description="任务执行结果")
@@ -96,6 +99,9 @@ class WorkflowShare(BaseModel):
name: Optional[str] = Field(default=None, description="工作流名称")
description: Optional[str] = Field(default=None, description="工作流描述")
timer: Optional[str] = Field(default=None, description="定时器")
trigger_type: Optional[str] = Field(default=None, description="触发类型")
event_type: Optional[str] = Field(default=None, description="事件类型")
event_conditions: Optional[str] = Field(default=None, description="事件条件")
actions: Optional[str] = Field(default=None, description="任务列表(JSON字符串)")
flows: Optional[str] = Field(default=None, description="任务流(JSON字符串)")
context: Optional[str] = Field(default=None, description="执行上下文(JSON字符串)")

View File

@@ -1,15 +1,16 @@
from app.core.workflow import WorkFlowManager
from app.chain.workflow import WorkflowChain
def init_workflow():
"""
初始化动作
初始化工作流
"""
WorkFlowManager()
def stop_workflow():
"""
停止动作
停止工作流
"""
WorkFlowManager().stop()

View File

@@ -0,0 +1,37 @@
"""2.1.8
Revision ID: 4666ce24a443
Revises: 3891a5e722a1
Create Date: 2025-07-22 13:54:04.196126
"""
import contextlib
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = '4666ce24a443'
down_revision = '3891a5e722a1'
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with contextlib.suppress(Exception):
# 添加触发类型字段
op.add_column('workflow', sa.Column('trigger_type', sa.String(), nullable=True, default='timer'))
with contextlib.suppress(Exception):
# 添加事件类型字段
op.add_column('workflow', sa.Column('event_type', sa.String(), nullable=True))
with contextlib.suppress(Exception):
# 添加事件条件字段
op.add_column('workflow', sa.Column('event_conditions', sa.JSON(), nullable=True, default={}))
# ### end Alembic commands ###
def downgrade() -> None:
pass