diff --git a/app/chain/workflow.py b/app/chain/workflow.py index 5bca510f..c0472a34 100644 --- a/app/chain/workflow.py +++ b/app/chain/workflow.py @@ -1,6 +1,7 @@ from typing import List from app.chain import ChainBase +from app.db.workflow_oper import WorkflowOper from app.schemas import Workflow @@ -9,6 +10,10 @@ class WorkflowChain(ChainBase): 工作流链 """ + def __init__(self): + super().__init__() + self.workflowoper = WorkflowOper() + def process(self, workflow_id: int) -> bool: """ 处理工作流 diff --git a/app/db/models/workflow.py b/app/db/models/workflow.py index fe398730..43921f28 100644 --- a/app/db/models/workflow.py +++ b/app/db/models/workflow.py @@ -2,7 +2,7 @@ from datetime import datetime from sqlalchemy import Column, Integer, JSON, Sequence, String -from app.db import Base +from app.db import Base, db_query class Workflow(Base): @@ -17,8 +17,8 @@ class Workflow(Base): description = Column(String) # 定时器 timer = Column(String) - # 状态:N-新建 R-运行中 P-暂停 S-成功 F-失败 - state = Column(String, nullable=False, index=True, default='N') + # 状态:W-等待 R-运行中 P-暂停 S-成功 F-失败 + state = Column(String, nullable=False, index=True, default='W') # 当前执行动作 current_action = Column(String) # 任务执行结果 @@ -33,3 +33,13 @@ class Workflow(Base): add_time = Column(String, default=datetime.now().strftime('%Y-%m-%d %H:%M:%S')) # 最后执行时间 last_time = Column(String) + + @staticmethod + @db_query + def get_enabled_workflows(db): + return db.query(Workflow).filter(Workflow.state != 'P').all() + + @staticmethod + @db_query + def get_by_name(db, name: str): + return db.query(Workflow).filter(Workflow.name == name).first() diff --git a/app/db/workflow_oper.py b/app/db/workflow_oper.py new file mode 100644 index 00000000..8d9df96f --- /dev/null +++ b/app/db/workflow_oper.py @@ -0,0 +1,38 @@ +from typing import List, Tuple + +from app.db import DbOper +from app.db.models.workflow import Workflow + + +class WorkflowOper(DbOper): + """ + 工作流管理 + """ + + def add(self, **kwargs) -> Tuple[bool, str]: + """ + 新增工作流 + """ + wf = Workflow(**kwargs) + if not wf.get_by_name(self._db, kwargs.get("name")): + wf.create(self._db) + return True, "新增工作流成功" + return False, "工作流已存在" + + def get(self, wid: int) -> Workflow: + """ + 查询单个工作流 + """ + return Workflow.get(self._db, wid) + + def list_enabled(self) -> List[Workflow]: + """ + 获取启用的工作流列表 + """ + return Workflow.get_enabled_workflows(self._db) + + def get_by_name(self, name: str) -> Workflow: + """ + 按名称获取工作流 + """ + return Workflow.get_by_name(self._db, name)