From b852acec28eb57c0452014401bceccd5c195db30 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Wed, 9 Jul 2025 09:34:53 +0800 Subject: [PATCH] fix workflow --- app/api/endpoints/system.py | 22 ++++---- app/api/endpoints/workflow.py | 102 +++++++++++++++++----------------- app/helper/workflow.py | 14 +++-- 3 files changed, 72 insertions(+), 66 deletions(-) diff --git a/app/api/endpoints/system.py b/app/api/endpoints/system.py index 7c9f2dfb..c47c529a 100644 --- a/app/api/endpoints/system.py +++ b/app/api/endpoints/system.py @@ -11,6 +11,7 @@ from typing import Optional, Union, Annotated import aiofiles import pillow_avif # noqa 用于自动注册AVIF支持 from PIL import Image +from app.helper.sites import SitesHelper from fastapi import APIRouter, Body, Depends, HTTPException, Header, Request, Response from fastapi.responses import StreamingResponse @@ -18,10 +19,10 @@ from app import schemas from app.chain.search import SearchChain from app.chain.system import SystemChain from app.core.config import global_vars, settings +from app.core.event import eventmanager from app.core.metainfo import MetaInfo from app.core.module import ModuleManager from app.core.security import verify_apitoken, verify_resource_token, verify_token -from app.core.event import eventmanager from app.db.models import User from app.db.systemconfig_oper import SystemConfigOper from app.db.user_oper import get_current_active_superuser @@ -29,7 +30,6 @@ from app.helper.mediaserver import MediaServerHelper from app.helper.message import MessageHelper from app.helper.progress import ProgressHelper from app.helper.rule import RuleHelper -from app.helper.sites import SitesHelper from app.helper.subscribe import SubscribeHelper from app.helper.system import SystemHelper from app.log import logger @@ -187,9 +187,11 @@ def get_global_setting(token: str): "COOKIECLOUD_KEY", "COOKIECLOUD_PASSWORD", "GITHUB_TOKEN", "REPO_GITHUB_TOKEN"} ) # 追加用户唯一ID和订阅分享管理权限 + share_admin = SubscribeHelper().is_admin_user() info.update({ "USER_UNIQUE_ID": SubscribeHelper().get_user_uuid(), - "SUBSCRIBE_SHARE_MANAGE": SubscribeHelper().is_admin_user(), + "SUBSCRIBE_SHARE_MANAGE": share_admin, + "WORKFLOW_SHARE_MANAGE": share_admin }) return schemas.Response(success=True, data=info) @@ -290,9 +292,9 @@ def get_setting(key: str, @router.post("/setting/{key}", summary="更新系统设置", response_model=schemas.Response) def set_setting( - key: str, - value: Annotated[Union[list, dict, bool, int, str] | None, Body()] = None, - _: User = Depends(get_current_active_superuser), + key: str, + value: Annotated[Union[list, dict, bool, int, str] | None, Body()] = None, + _: User = Depends(get_current_active_superuser), ): """ 更新系统设置(仅管理员) @@ -452,10 +454,10 @@ def ruletest(title: str, @router.get("/nettest", summary="测试网络连通性") def nettest( - url: str, - proxy: bool, - include: Optional[str] = None, - _: schemas.TokenPayload = Depends(verify_token), + url: str, + proxy: bool, + include: Optional[str] = None, + _: schemas.TokenPayload = Depends(verify_token), ): """ 测试网络连通性 diff --git a/app/api/endpoints/workflow.py b/app/api/endpoints/workflow.py index 53bb7854..b0f09f03 100644 --- a/app/api/endpoints/workflow.py +++ b/app/api/endpoints/workflow.py @@ -65,55 +65,6 @@ def list_actions(_: schemas.TokenPayload = Depends(get_current_active_user)) -> return WorkFlowManager().list_actions() -@router.get("/{workflow_id}", summary="工作流详情", response_model=schemas.Workflow) -def get_workflow(workflow_id: int, - db: Session = Depends(get_db), - _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: - """ - 获取工作流详情 - """ - from app.db.workflow_oper import WorkflowOper - return WorkflowOper(db).get(workflow_id) - - -@router.put("/{workflow_id}", summary="更新工作流", response_model=schemas.Response) -def update_workflow(workflow: schemas.Workflow, - db: Session = Depends(get_db), - _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: - """ - 更新工作流 - """ - from app.db.workflow_oper import WorkflowOper - if not workflow.id: - return schemas.Response(success=False, message="工作流ID不能为空") - wf = WorkflowOper(db).get(workflow.id) - if not wf: - return schemas.Response(success=False, message="工作流不存在") - wf.update(db, workflow.dict()) - return schemas.Response(success=True, message="更新成功") - - -@router.delete("/{workflow_id}", summary="删除工作流", response_model=schemas.Response) -def delete_workflow(workflow_id: int, - db: Session = Depends(get_db), - _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: - """ - 删除工作流 - """ - from app.db.workflow_oper import WorkflowOper - workflow = WorkflowOper(db).get(workflow_id) - if not workflow: - return schemas.Response(success=False, message="工作流不存在") - # 删除定时任务 - Scheduler().remove_workflow_job(workflow) - # 删除工作流 - from app.db.models.workflow import Workflow as WorkflowModel - WorkflowModel.delete(db, workflow_id) - # 删除缓存 - SystemConfigOper().delete(f"WorkflowCache-{workflow_id}") - return schemas.Response(success=True, message="删除成功") - - @router.post("/share", summary="分享工作流", response_model=schemas.Response) def workflow_share( workflow: schemas.WorkflowShare, @@ -189,8 +140,8 @@ def workflow_fork( workflow.create(db) # 更新复用次数 - if workflow_share.id: - WorkflowHelper().workflow_fork(share_id=workflow_share.id) + if workflow.id: + WorkflowHelper().workflow_fork(share_id=workflow.id) return schemas.Response(success=True, message="复用成功") @@ -276,3 +227,52 @@ def reset_workflow(workflow_id: int, # 删除缓存 SystemConfigOper().delete(f"WorkflowCache-{workflow_id}") return schemas.Response(success=True) + + +@router.get("/{workflow_id}", summary="工作流详情", response_model=schemas.Workflow) +def get_workflow(workflow_id: int, + db: Session = Depends(get_db), + _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: + """ + 获取工作流详情 + """ + from app.db.workflow_oper import WorkflowOper + return WorkflowOper(db).get(workflow_id) + + +@router.put("/{workflow_id}", summary="更新工作流", response_model=schemas.Response) +def update_workflow(workflow: schemas.Workflow, + db: Session = Depends(get_db), + _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: + """ + 更新工作流 + """ + from app.db.workflow_oper import WorkflowOper + if not workflow.id: + return schemas.Response(success=False, message="工作流ID不能为空") + wf = WorkflowOper(db).get(workflow.id) + if not wf: + return schemas.Response(success=False, message="工作流不存在") + wf.update(db, workflow.dict()) + return schemas.Response(success=True, message="更新成功") + + +@router.delete("/{workflow_id}", summary="删除工作流", response_model=schemas.Response) +def delete_workflow(workflow_id: int, + db: Session = Depends(get_db), + _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: + """ + 删除工作流 + """ + from app.db.workflow_oper import WorkflowOper + workflow = WorkflowOper(db).get(workflow_id) + if not workflow: + return schemas.Response(success=False, message="工作流不存在") + # 删除定时任务 + Scheduler().remove_workflow_job(workflow) + # 删除工作流 + from app.db.models.workflow import Workflow as WorkflowModel + WorkflowModel.delete(db, workflow_id) + # 删除缓存 + SystemConfigOper().delete(f"WorkflowCache-{workflow_id}") + return schemas.Response(success=True, message="删除成功") diff --git a/app/helper/workflow.py b/app/helper/workflow.py index db24c5a7..729f1a24 100644 --- a/app/helper/workflow.py +++ b/app/helper/workflow.py @@ -1,3 +1,4 @@ +import json from typing import List, Tuple, Optional from app.core.cache import cached, cache_backend @@ -40,12 +41,15 @@ class WorkflowHelper(metaclass=WeakSingleton): if not workflow: return False, "工作流不存在" + if not workflow.actions or not workflow.flows: + return False, "请分享有动作和流程的工作流" + workflow_dict = workflow.to_dict() - workflow_dict.pop("id") - - # 清除缓存 - cache_backend.clear(region=self._shares_cache_region) - + workflow_dict.pop("id", None) + workflow_dict.pop("context", None) + workflow_dict['actions'] = json.dumps(workflow_dict['actions'] or []) + workflow_dict['flows'] = json.dumps(workflow_dict['flows'] or []) + # 发送分享请求 res = RequestUtils(proxies=settings.PROXY or {}, content_type="application/json", timeout=10).post(self._workflow_share,