diff --git a/app/api/endpoints/workflow.py b/app/api/endpoints/workflow.py index b0f09f03..9d3d39be 100644 --- a/app/api/endpoints/workflow.py +++ b/app/api/endpoints/workflow.py @@ -14,8 +14,10 @@ from app.db import get_db from app.db.models import Workflow from app.db.systemconfig_oper import SystemConfigOper from app.db.user_oper import get_current_active_user +from app.db.workflow_oper import WorkflowOper from app.helper.workflow import WorkflowHelper from app.scheduler import Scheduler +from app.schemas.types import EventType router = APIRouter() @@ -44,8 +46,9 @@ def create_workflow(workflow: schemas.Workflow, workflow.add_time = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S") if not workflow.state: workflow.state = "P" - from app.db.models.workflow import Workflow as WorkflowModel - WorkflowModel(**workflow.dict()).create(db) + if not workflow.trigger_type: + workflow.trigger_type = "timer" + Workflow(**workflow.dict()).create(db) return schemas.Response(success=True, message="创建工作流成功") @@ -65,6 +68,17 @@ def list_actions(_: schemas.TokenPayload = Depends(get_current_active_user)) -> return WorkFlowManager().list_actions() +@router.get("/event_types", summary="获取所有事件类型", response_model=List[dict]) +def get_event_types(_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: + """ + 获取所有事件类型 + """ + return [{ + "title": event_type.name, + "value": event_type.value + } for event_type in EventType] + + @router.post("/share", summary="分享工作流", response_model=schemas.Response) def workflow_share( workflow: schemas.WorkflowShare, @@ -125,6 +139,9 @@ def workflow_fork( "name": workflow.name, "description": workflow.description, "timer": workflow.timer, + "trigger_type": workflow.trigger_type or "timer", + "event_type": workflow.event_type, + "event_conditions": json.loads(workflow.event_conditions or "{}") if workflow.event_conditions else {}, "actions": actions, "flows": flows, "context": context, @@ -182,8 +199,12 @@ def start_workflow(workflow_id: int, workflow = WorkflowOper(db).get(workflow_id) if not workflow: return schemas.Response(success=False, message="工作流不存在") - # 添加定时任务 - Scheduler().update_workflow_job(workflow) + if not workflow.event_type or workflow.event_type == "timer": + # 添加定时任务 + Scheduler().update_workflow_job(workflow) + else: + # 事件触发:添加到事件触发器 + WorkFlowManager().load_workflow_events(workflow_id) # 更新状态 workflow.update_state(db, workflow_id, "W") return schemas.Response(success=True) @@ -200,8 +221,13 @@ def pause_workflow(workflow_id: int, workflow = WorkflowOper(db).get(workflow_id) if not workflow: return schemas.Response(success=False, message="工作流不存在") - # 删除定时任务 - Scheduler().remove_workflow_job(workflow) + # 根据触发类型进行不同处理 + if workflow.trigger_type == "timer": + # 定时触发:移除定时任务 + Scheduler().remove_workflow_job(workflow) + elif workflow.trigger_type == "event": + # 事件触发:从事件触发器中移除 + WorkFlowManager().remove_workflow_event(workflow_id, workflow.event_type) # 停止工作流 global_vars.stop_workflow(workflow_id) # 更新状态 @@ -247,12 +273,13 @@ def update_workflow(workflow: schemas.Workflow, """ 更新工作流 """ - from app.db.workflow_oper import WorkflowOper if not workflow.id: return schemas.Response(success=False, message="工作流ID不能为空") wf = WorkflowOper(db).get(workflow.id) if not wf: return schemas.Response(success=False, message="工作流不存在") + if not wf.event_type: + workflow.event_type = "timer" wf.update(db, workflow.dict()) return schemas.Response(success=True, message="更新成功") @@ -264,15 +291,17 @@ def delete_workflow(workflow_id: int, """ 删除工作流 """ - from app.db.workflow_oper import WorkflowOper workflow = WorkflowOper(db).get(workflow_id) if not workflow: return schemas.Response(success=False, message="工作流不存在") - # 删除定时任务 - Scheduler().remove_workflow_job(workflow) + if not workflow.event_type or workflow.event_type == "timer": + # 定时触发:删除定时任务 + Scheduler().remove_workflow_job(workflow) + else: + # 事件触发:从事件触发器中移除 + WorkFlowManager().register_workflow_event(workflow_id, workflow.event_type) # 删除工作流 - from app.db.models.workflow import Workflow as WorkflowModel - WorkflowModel.delete(db, workflow_id) + Workflow.delete(db, workflow_id) # 删除缓存 SystemConfigOper().delete(f"WorkflowCache-{workflow_id}") return schemas.Response(success=True, message="删除成功") diff --git a/app/chain/workflow.py b/app/chain/workflow.py index 200524c7..bf2e2d1d 100644 --- a/app/chain/workflow.py +++ b/app/chain/workflow.py @@ -10,11 +10,13 @@ from pydantic.fields import Callable from app.chain import ChainBase from app.core.config import global_vars +from app.core.event import Event, eventmanager from app.core.workflow import WorkFlowManager from app.db.models import Workflow from app.db.workflow_oper import WorkflowOper from app.log import logger from app.schemas import ActionContext, ActionFlow, Action, ActionExecution +from app.schemas.types import EventType class WorkflowExecutor: @@ -188,6 +190,16 @@ class WorkflowChain(ChainBase): 工作流链 """ + @eventmanager.register(EventType.WorkflowExecute) + def event_process(self, event: Event): + """ + 事件触发工作流执行 + """ + workflow_id = event.event_data.get('workflow_id') + if not workflow_id: + return + self.process(workflow_id, from_begin=False) + @staticmethod def process(workflow_id: int, from_begin: Optional[bool] = True) -> Tuple[bool, str]: """ @@ -225,7 +237,7 @@ class WorkflowChain(ChainBase): logger.warn(f"工作流 {workflow.name} 无流程") return False, "工作流无流程" - logger.info(f"开始处理 {workflow.name},共 {len(workflow.actions)} 个动作 ...") + logger.info(f"开始执行工作流 {workflow.name},共 {len(workflow.actions)} 个动作 ...") workflowoper.start(workflow_id) # 执行工作流 diff --git a/app/core/workflow.py b/app/core/workflow.py index 87730c85..de7531c0 100644 --- a/app/core/workflow.py +++ b/app/core/workflow.py @@ -1,10 +1,15 @@ +import threading from time import sleep -from typing import Dict, Any, Tuple, List +from typing import Dict, Any, Optional +from typing import List, Tuple from app.core.config import global_vars +from app.core.event import eventmanager, Event +from app.db.workflow_oper import WorkflowOper from app.helper.module import ModuleHelper from app.log import logger -from app.schemas import Action, ActionContext +from app.schemas import ActionContext, Action +from app.schemas.types import EventType from app.utils.singleton import Singleton @@ -15,7 +20,9 @@ class WorkFlowManager(metaclass=Singleton): def __init__(self): # 所有动作定义 + self._lock = threading.Lock() self._actions: Dict[str, Any] = {} + self._event_workflows: Dict[str, List[int]] = {} self.init() def init(self): @@ -48,11 +55,15 @@ class WorkFlowManager(metaclass=Singleton): except Exception as err: logger.error(f"加载动作失败: {action.__name__} - {err}") + # 加载工作流事件触发器 + self.load_workflow_events() + def stop(self): """ 停止 """ - pass + self._actions = {} + self._event_workflows = {} def excute(self, workflow_id: int, action: Action, context: ActionContext = None) -> Tuple[bool, str, ActionContext]: @@ -109,3 +120,175 @@ class WorkFlowManager(metaclass=Singleton): } } for key, action in self._actions.items() ] + + def load_workflow_events(self, workflow_id: Optional[int] = None): + """ + 加载工作流触发事件 + """ + workflows = [] + if workflow_id: + workflow = WorkflowOper().get(workflow_id) + if workflow: + workflows = [workflow] + else: + workflows = WorkflowOper().get_event_triggered_workflows() + try: + with self._lock: + for workflow in workflows: + # 确保先移除旧的事件监听器 + self.remove_workflow_event(workflow_id=workflow.id, event_type_str=workflow.event_type) + # 如果工作流是事件触发类型且未被禁用 + if workflow.trigger_type == "event" and workflow.state != 'P': + # 注册事件触发器 + self.register_workflow_event(workflow.id, workflow.event_type) + except Exception as e: + logger.error(f"加载事件触发工作流失败: {e}") + + def register_workflow_event(self, workflow_id: int, event_type_str: str): + """ + 注册工作流事件触发器 + """ + try: + event_type = EventType(event_type_str) + except ValueError: + logger.error(f"无效的事件类型: {event_type_str}") + return + if event_type in EventType: + with self._lock: + # 确保先移除旧的事件监听器 + self.remove_workflow_event(workflow_id, event_type.value) + # 添加新的事件监听器 + eventmanager.add_event_listener(event_type, self._handle_event) + # 记录工作流事件触发器 + if event_type.value not in self._event_workflows: + self._event_workflows[event_type.value] = [] + self._event_workflows[event_type.value].append(workflow_id) + logger.info(f"已注册工作流 {workflow_id} 事件触发器: {event_type.value}") + + def remove_workflow_event(self, workflow_id: int, event_type_str: str): + """ + 移除工作流事件触发器 + """ + try: + event_type = EventType(event_type_str) + except ValueError: + logger.error(f"无效的事件类型: {event_type_str}") + return + if event_type in EventType: + with self._lock: + eventmanager.remove_event_listener(event_type, self._handle_event) + if event_type.value in self._event_workflows: + if workflow_id in self._event_workflows[event_type.value]: + self._event_workflows[event_type.value].remove(workflow_id) + if not self._event_workflows[event_type.value]: + del self._event_workflows[event_type.value] + logger.info(f"已移除工作流 {workflow_id} 事件触发器") + + def _handle_event(self, event: Event): + """ + 处理事件,触发相应的工作流 + """ + try: + event_type_str = str(event.event_type.value) + with self._lock: + if event_type_str not in self._event_workflows: + return + workflow_ids = self._event_workflows[event_type_str].copy() + for workflow_id in workflow_ids: + self._trigger_workflow(workflow_id, event) + except Exception as e: + logger.error(f"处理工作流事件失败: {e}") + + def _trigger_workflow(self, workflow_id: int, event: Event): + """ + 触发工作流执行 + """ + try: + # 检查工作流是否存在且启用 + workflow = WorkflowOper().get(workflow_id) + if not workflow or workflow.state == 'P': + return + + # 检查事件条件 + if not self._check_event_conditions(workflow, event): + logger.debug(f"工作流 {workflow.name} 事件条件不匹配,跳过执行") + return + + # 检查工作流是否正在运行 + if workflow.state == 'R': + logger.warning(f"工作流 {workflow.name} 正在运行中,跳过重复触发") + return + + logger.info(f"事件 {event.event_type.value} 触发工作流: {workflow.name}") + + # 发送工作流执行事件以启动工作流 + eventmanager.send_event(EventType.WorkflowExecute, { + "workflow_id": workflow_id, + }) + + except Exception as e: + logger.error(f"触发工作流 {workflow_id} 失败: {e}") + + def _check_event_conditions(self, workflow, event: Event) -> bool: + """ + 检查事件是否满足工作流的触发条件 + """ + if not workflow.event_conditions: + return True + + conditions = workflow.event_conditions + event_data = event.event_data or {} + + # 检查字段匹配条件 + for field, expected_value in conditions.items(): + if field not in event_data: + return False + actual_value = event_data[field] + # 支持多种条件匹配方式 + if isinstance(expected_value, dict): + # 复杂条件匹配 + if not self._check_complex_condition(actual_value, expected_value): + return False + else: + # 简单值匹配 + if actual_value != expected_value: + return False + return True + + @staticmethod + def _check_complex_condition(actual_value: any, condition: dict) -> bool: + """ + 检查复杂条件匹配 + 支持的操作符:equals, not_equals, contains, not_contains, in, not_in, regex + """ + for operator, expected_value in condition.items(): + if operator == "equals": + if actual_value != expected_value: + return False + elif operator == "not_equals": + if actual_value == expected_value: + return False + elif operator == "contains": + if expected_value not in str(actual_value): + return False + elif operator == "not_contains": + if expected_value in str(actual_value): + return False + elif operator == "in": + if actual_value not in expected_value: + return False + elif operator == "not_in": + if actual_value in expected_value: + return False + elif operator == "regex": + import re + if not re.search(expected_value, str(actual_value)): + return False + return True + + def get_event_workflows(self) -> dict: + """ + 获取所有事件触发的工作流 + """ + with self._lock: + return self._event_workflows.copy() diff --git a/app/db/models/workflow.py b/app/db/models/workflow.py index 78c6c2db..8d6668a7 100644 --- a/app/db/models/workflow.py +++ b/app/db/models/workflow.py @@ -18,6 +18,12 @@ class Workflow(Base): description = Column(String) # 定时器 timer = Column(String) + # 触发类型:timer-定时触发 event-事件触发 manual-手动触发 + trigger_type = Column(String, default='timer') + # 事件类型(当trigger_type为event时使用) + event_type = Column(String) + # 事件条件(JSON格式,用于过滤事件) + event_conditions = Column(JSON, default=dict) # 状态:W-等待 R-运行中 P-暂停 S-成功 F-失败 state = Column(String, nullable=False, index=True, default='W') # 已执行动作(,分隔) @@ -47,6 +53,17 @@ class Workflow(Base): def get_enabled_workflows(db): return db.query(Workflow).filter(Workflow.state != 'P').all() + @staticmethod + @db_query + def get_event_triggered_workflows(db): + """获取事件触发的工作流""" + return db.query(Workflow).filter( + and_( + Workflow.trigger_type == 'event', + Workflow.state != 'P' + ) + ).all() + @staticmethod @db_query def get_by_name(db, name: str): diff --git a/app/db/workflow_oper.py b/app/db/workflow_oper.py index d55f940a..5fa71cbc 100644 --- a/app/db/workflow_oper.py +++ b/app/db/workflow_oper.py @@ -37,6 +37,12 @@ class WorkflowOper(DbOper): """ return Workflow.get_enabled_workflows(self._db) + def get_event_triggered_workflows(self) -> List[Workflow]: + """ + 获取事件触发的工作流列表 + """ + return Workflow.get_event_triggered_workflows(self._db) + def get_by_name(self, name: str) -> Workflow: """ 按名称获取工作流 diff --git a/app/schemas/types.py b/app/schemas/types.py index 75191cd5..58b2db94 100644 --- a/app/schemas/types.py +++ b/app/schemas/types.py @@ -65,6 +65,8 @@ class EventType(Enum): ConfigChanged = "config.updated" # 消息交互动作 MessageAction = "message.action" + # 执行工作流 + WorkflowExecute = "workflow.execute" # 同步链式事件 diff --git a/app/schemas/workflow.py b/app/schemas/workflow.py index ff7b041b..57d5c3b5 100644 --- a/app/schemas/workflow.py +++ b/app/schemas/workflow.py @@ -17,6 +17,9 @@ class Workflow(BaseModel): name: Optional[str] = Field(default=None, description="工作流名称") description: Optional[str] = Field(default=None, description="工作流描述") timer: Optional[str] = Field(default=None, description="定时器") + trigger_type: Optional[str] = Field(default='timer', description="触发类型:timer-定时触发 event-事件触发 manual-手动触发") + event_type: Optional[str] = Field(default=None, description="事件类型(当trigger_type为event时使用)") + event_conditions: Optional[dict] = Field(default={}, description="事件条件(JSON格式,用于过滤事件)") state: Optional[str] = Field(default=None, description="状态") current_action: Optional[str] = Field(default=None, description="已执行动作") result: Optional[str] = Field(default=None, description="任务执行结果") @@ -96,6 +99,9 @@ class WorkflowShare(BaseModel): name: Optional[str] = Field(default=None, description="工作流名称") description: Optional[str] = Field(default=None, description="工作流描述") timer: Optional[str] = Field(default=None, description="定时器") + trigger_type: Optional[str] = Field(default=None, description="触发类型") + event_type: Optional[str] = Field(default=None, description="事件类型") + event_conditions: Optional[str] = Field(default=None, description="事件条件") actions: Optional[str] = Field(default=None, description="任务列表(JSON字符串)") flows: Optional[str] = Field(default=None, description="任务流(JSON字符串)") context: Optional[str] = Field(default=None, description="执行上下文(JSON字符串)") diff --git a/app/startup/workflow_initializer.py b/app/startup/workflow_initializer.py index d81c0e6e..8ef0a37d 100644 --- a/app/startup/workflow_initializer.py +++ b/app/startup/workflow_initializer.py @@ -1,15 +1,16 @@ from app.core.workflow import WorkFlowManager +from app.chain.workflow import WorkflowChain def init_workflow(): """ - 初始化动作 + 初始化工作流 """ WorkFlowManager() def stop_workflow(): """ - 停止动作 + 停止工作流 """ WorkFlowManager().stop() diff --git a/database/versions/4666ce24a443_2_1_8.py b/database/versions/4666ce24a443_2_1_8.py new file mode 100644 index 00000000..dfdfef24 --- /dev/null +++ b/database/versions/4666ce24a443_2_1_8.py @@ -0,0 +1,37 @@ +"""2.1.8 + +Revision ID: 4666ce24a443 +Revises: 3891a5e722a1 +Create Date: 2025-07-22 13:54:04.196126 + +""" +import contextlib + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = '4666ce24a443' +down_revision = '3891a5e722a1' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with contextlib.suppress(Exception): + # 添加触发类型字段 + op.add_column('workflow', sa.Column('trigger_type', sa.String(), nullable=True, default='timer')) + + with contextlib.suppress(Exception): + # 添加事件类型字段 + op.add_column('workflow', sa.Column('event_type', sa.String(), nullable=True)) + + with contextlib.suppress(Exception): + # 添加事件条件字段 + op.add_column('workflow', sa.Column('event_conditions', sa.JSON(), nullable=True, default={})) + # ### end Alembic commands ### + + +def downgrade() -> None: + pass