diff --git a/app/chain/workflow.py b/app/chain/workflow.py index c0472a34..b2326917 100644 --- a/app/chain/workflow.py +++ b/app/chain/workflow.py @@ -1,8 +1,10 @@ from typing import List from app.chain import ChainBase +from app.core.workflow import WorkFlowManager from app.db.workflow_oper import WorkflowOper -from app.schemas import Workflow +from app.log import logger +from app.schemas import Workflow, ActionContext class WorkflowChain(ChainBase): @@ -13,15 +15,36 @@ class WorkflowChain(ChainBase): def __init__(self): super().__init__() self.workflowoper = WorkflowOper() + self.workflowmanager = WorkFlowManager() def process(self, workflow_id: int) -> bool: """ 处理工作流 """ - pass + workflow = self.workflowoper.get(workflow_id) + if not workflow: + logger.warn(f"工作流 {workflow_id} 不存在") + return False + if not workflow.actions: + logger.warn(f"工作流 {workflow.name} 无动作") + return False + logger.info(f"开始处理 {workflow.name},共 {len(workflow.actions)} 个动作 ...") + # 启用上下文 + context = ActionContext() + self.workflowoper.start(workflow_id) + for action in workflow.actions: + state, context = self.workflowmanager.excute(action, context) + self.workflowoper.step(workflow_id, action=action.name, context=context.dict()) + if not state: + logger.error(f"动作 {action.name} 执行失败,工作流失败") + self.workflowoper.fail(workflow_id, result=f"动作 {action.name} 执行失败") + return False + logger.info(f"工作流 {workflow.name} 执行完成") + self.workflowoper.success(workflow_id) + return True def get_workflows(self) -> List[Workflow]: """ 获取工作流列表 """ - pass + return self.workflowoper.list_enabled() diff --git a/app/db/models/workflow.py b/app/db/models/workflow.py index 43921f28..a85d9c7c 100644 --- a/app/db/models/workflow.py +++ b/app/db/models/workflow.py @@ -2,7 +2,7 @@ from datetime import datetime from sqlalchemy import Column, Integer, JSON, Sequence, String -from app.db import Base, db_query +from app.db import Base, db_query, db_update class Workflow(Base): @@ -43,3 +43,45 @@ class Workflow(Base): @db_query def get_by_name(db, name: str): return db.query(Workflow).filter(Workflow.name == name).first() + + @staticmethod + @db_update + def update_state(db, wid: int, state: str): + db.query(Workflow).filter(Workflow.id == wid).update({"state": state}) + return True + + @staticmethod + @db_update + def start(db, wid: int): + db.query(Workflow).filter(Workflow.id == wid).update({ + "state": 'R' + }) + return True + + @staticmethod + @db_update + def fail(db, wid: int, result: str): + db.query(Workflow).filter(Workflow.id == wid).update({ + "state": 'F', + "result": result, + "run_count": Workflow.run_count + 1, + "last_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S') + }) + return True + + @staticmethod + @db_update + def success(db, wid: int, result: str = None): + db.query(Workflow).filter(Workflow.id == wid).update({ + "state": 'S', + "result": result, + "run_count": Workflow.run_count + 1, + "last_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S') + }) + return True + + @staticmethod + @db_update + def update_current_action(db, wid: int, action: str, context: dict): + db.query(Workflow).filter(Workflow.id == wid).update({"current_action": action, "context": context}) + return True diff --git a/app/db/workflow_oper.py b/app/db/workflow_oper.py index 8d9df96f..e73f9328 100644 --- a/app/db/workflow_oper.py +++ b/app/db/workflow_oper.py @@ -36,3 +36,27 @@ class WorkflowOper(DbOper): 按名称获取工作流 """ return Workflow.get_by_name(self._db, name) + + def start(self, wid: int) -> bool: + """ + 启动 + """ + return Workflow.start(self._db, wid) + + def success(self, wid: int, result: str = None) -> bool: + """ + 成功 + """ + return Workflow.success(self._db, wid, result) + + def fail(self, wid: int, result: str) -> bool: + """ + 失败 + """ + return Workflow.fail(self._db, wid, result) + + def step(self, wid: int, action: str, context: dict) -> bool: + """ + 步进 + """ + return Workflow.update_current_action(self._db, wid, action, context) diff --git a/app/schemas/workflow.py b/app/schemas/workflow.py index 45a90906..6b132db0 100644 --- a/app/schemas/workflow.py +++ b/app/schemas/workflow.py @@ -26,6 +26,9 @@ class Workflow(BaseModel): add_time: Optional[str] = Field(None, description="创建时间") last_time: Optional[str] = Field(None, description="最后执行时间") + class Config: + orm_mode = True + class ActionParams(BaseModel): """