diff --git a/app/api/endpoints/workflow.py b/app/api/endpoints/workflow.py index ab95b136..9431692d 100644 --- a/app/api/endpoints/workflow.py +++ b/app/api/endpoints/workflow.py @@ -1,4 +1,4 @@ -THIS SHOULD BE A LINTER ERRORfrom datetime import datetime +from datetime import datetime from typing import List, Any, Optional import json @@ -27,7 +27,7 @@ def list_workflows(db: Session = Depends(get_db), 获取工作流列表 """ from app.db.workflow_oper import WorkflowOper - return WorkflowOper().list() + return WorkflowOper(db).list() @router.post("/", summary="创建工作流", response_model=schemas.Response) @@ -38,7 +38,7 @@ def create_workflow(workflow: schemas.Workflow, 创建工作流 """ from app.db.workflow_oper import WorkflowOper - if workflow.name and WorkflowOper().get_by_name(workflow.name): + if workflow.name and WorkflowOper(db).get_by_name(workflow.name): return schemas.Response(success=False, message="已存在相同名称的工作流") if not workflow.add_time: workflow.add_time = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S") @@ -73,7 +73,7 @@ def get_workflow(workflow_id: int, 获取工作流详情 """ from app.db.workflow_oper import WorkflowOper - return WorkflowOper().get(workflow_id) + return WorkflowOper(db).get(workflow_id) @router.put("/{workflow_id}", summary="更新工作流", response_model=schemas.Response) @@ -86,7 +86,7 @@ def update_workflow(workflow: schemas.Workflow, from app.db.workflow_oper import WorkflowOper if not workflow.id: return schemas.Response(success=False, message="工作流ID不能为空") - wf = WorkflowOper().get(workflow.id) + wf = WorkflowOper(db).get(workflow.id) if not wf: return schemas.Response(success=False, message="工作流不存在") wf.update(db, workflow.dict()) @@ -101,7 +101,7 @@ def delete_workflow(workflow_id: int, 删除工作流 """ from app.db.workflow_oper import WorkflowOper - workflow = WorkflowOper().get(workflow_id) + workflow = WorkflowOper(db).get(workflow_id) if not workflow: return schemas.Response(success=False, message="工作流不存在") # 删除定时任务 @@ -145,6 +145,7 @@ def workflow_share_delete( @router.post("/fork", summary="复用工作流", response_model=schemas.Response) def workflow_fork( workflow_share: schemas.WorkflowShare, + db: Session = Depends(get_db), current_user: schemas.User = Depends(get_current_active_user)) -> Any: """ 复用工作流 @@ -152,26 +153,40 @@ def workflow_fork( if not workflow_share.name: return schemas.Response(success=False, message="工作流名称不能为空") + # 解析JSON数据,添加错误处理 + try: + actions = json.loads(workflow_share.actions or "[]") + except json.JSONDecodeError: + return schemas.Response(success=False, message="actions字段JSON格式错误") + + try: + flows = json.loads(workflow_share.flows or "[]") + except json.JSONDecodeError: + return schemas.Response(success=False, message="flows字段JSON格式错误") + + try: + context = json.loads(workflow_share.context or "{}") + except json.JSONDecodeError: + return schemas.Response(success=False, message="context字段JSON格式错误") + # 创建工作流 workflow_dict = { "name": workflow_share.name, "description": workflow_share.description, "timer": workflow_share.timer, - "actions": json.loads(workflow_share.actions or "[]"), - "flows": json.loads(workflow_share.flows or "[]"), - "context": json.loads(workflow_share.context or "{}"), + "actions": actions, + "flows": flows, + "context": context, "state": "P" # 默认暂停状态 } # 检查名称是否重复 from app.db.workflow_oper import WorkflowOper - if WorkflowOper().get_by_name(workflow_dict["name"]): + if WorkflowOper(db).get_by_name(workflow_dict["name"]): return schemas.Response(success=False, message="已存在相同名称的工作流") # 创建新工作流 from app.db.models.workflow import Workflow as WorkflowModel - from app.db import get_db - db = next(get_db()) workflow = WorkflowModel(**workflow_dict) workflow.create(db) @@ -215,7 +230,7 @@ def start_workflow(workflow_id: int, 启用工作流 """ from app.db.workflow_oper import WorkflowOper - workflow = WorkflowOper().get(workflow_id) + workflow = WorkflowOper(db).get(workflow_id) if not workflow: return schemas.Response(success=False, message="工作流不存在") # 添加定时任务 @@ -233,7 +248,7 @@ def pause_workflow(workflow_id: int, 停用工作流 """ from app.db.workflow_oper import WorkflowOper - workflow = WorkflowOper().get(workflow_id) + workflow = WorkflowOper(db).get(workflow_id) if not workflow: return schemas.Response(success=False, message="工作流不存在") # 删除定时任务 @@ -253,7 +268,7 @@ def reset_workflow(workflow_id: int, 重置工作流 """ from app.db.workflow_oper import WorkflowOper - workflow = WorkflowOper().get(workflow_id) + workflow = WorkflowOper(db).get(workflow_id) if not workflow: return schemas.Response(success=False, message="工作流不存在") # 停止工作流 diff --git a/app/helper/workflow.py b/app/helper/workflow.py index 5cf49dc4..4cdbd5a5 100644 --- a/app/helper/workflow.py +++ b/app/helper/workflow.py @@ -40,12 +40,17 @@ class WorkflowHelper(metaclass=WeakSingleton): return False, "当前没有开启工作流数据共享功能" # 获取工作流信息 - workflow = WorkflowOper().get(workflow_id) - if not workflow: - return False, "工作流不存在" - - workflow_dict = workflow.to_dict() - workflow_dict.pop("id") + from app.db import get_db + db = next(get_db()) + try: + workflow = WorkflowOper(db).get(workflow_id) + if not workflow: + return False, "工作流不存在" + + workflow_dict = workflow.to_dict() + workflow_dict.pop("id") + finally: + db.close() # 清除缓存 cache_backend.clear(region=self._shares_cache_region)