From 2a3ea8315ddf2e26657b0d6027b1739cb97f7e84 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Wed, 9 Jul 2025 00:19:47 +0800 Subject: [PATCH] fix workflow --- WORKFLOW_SHARE_README.md | 180 ---------------------------------- app/api/endpoints/workflow.py | 66 ++++++------- app/core/config.py | 4 +- app/helper/workflow.py | 21 ++-- 4 files changed, 40 insertions(+), 231 deletions(-) delete mode 100644 WORKFLOW_SHARE_README.md diff --git a/WORKFLOW_SHARE_README.md b/WORKFLOW_SHARE_README.md deleted file mode 100644 index 52d0f7af..00000000 --- a/WORKFLOW_SHARE_README.md +++ /dev/null @@ -1,180 +0,0 @@ -# 工作流分享功能 - -## 概述 - -基于订阅分享的相关API接口和helper类,新增了工作流分享相关接口和helper,以实现共享公共服务器的相关接口给前端调用,与订阅使用的是同一个服务器。 - -## 功能特性 - -1. **工作流分享** - 将本地工作流分享到公共服务器 -2. **分享管理** - 查看、删除已分享的工作流 -3. **工作流复用** - 从公共服务器复用其他用户分享的工作流 -4. **缓存机制** - 使用缓存提高查询性能 - -## 文件结构 - -### 新增文件 -- `app/schemas/workflow.py` - 新增 `WorkflowShare` schema类 -- `app/helper/workflow.py` - 新增工作流分享helper类 - -### 修改文件 -- `app/api/endpoints/workflow.py` - 新增工作流分享相关API接口,使用WorkflowOper进行数据库操作 -- `app/db/workflow_oper.py` - 新增list方法 -- `app/db/models/workflow.py` - 新增list静态方法 -- `app/core/config.py` - 新增WORKFLOW_STATISTIC_SHARE配置项 - -## API接口 - -### 1. 分享工作流 -``` -POST /api/v1/workflow/share -``` - -**请求参数:** -```json -{ - "id": 1, - "share_title": "我的工作流", - "share_comment": "这是一个自动化工作流", - "share_user": "用户名" -} -``` - -**响应:** -```json -{ - "success": true, - "message": "success" -} -``` - -### 2. 删除分享 -``` -DELETE /api/v1/workflow/share/{share_id} -``` - -**响应:** -```json -{ - "success": true, - "message": "success" -} -``` - -### 3. 复用工作流 -``` -POST /api/v1/workflow/fork -``` - -**请求参数:** -```json -{ - "id": 1, - "name": "工作流名称", - "description": "工作流描述", - "timer": "0 0 * * *", - "actions": "[{\"id\": \"action1\", \"type\": \"test\"}]", - "flows": "[{\"id\": \"flow1\", \"source\": \"action1\"}]", - "context": "{}" -} -``` - -**响应:** -```json -{ - "success": true, - "message": "复用成功" -} -``` - -### 4. 查询分享的工作流 -``` -GET /api/v1/workflow/shares?name=关键词&page=1&count=30 -``` - -**响应:** -```json -[ - { - "id": 1, - "share_title": "我的工作流", - "share_comment": "这是一个自动化工作流", - "share_user": "用户名", - "share_uid": "user_uuid", - "name": "工作流名称", - "description": "工作流描述", - "timer": "0 0 * * *", - "actions": "[{\"id\": \"action1\", \"type\": \"test\"}]", - "flows": "[{\"id\": \"flow1\", \"source\": \"action1\"}]", - "context": "{}", - "date": "2024-01-01 12:00:00", - "count": 5 - } -] -``` - -## 配置说明 - -工作流分享功能使用独立的配置项 `WORKFLOW_STATISTIC_SHARE`,当该配置为 `true` 时,工作流分享功能才会启用。 - -### 配置项 -- `WORKFLOW_STATISTIC_SHARE`: 工作流数据共享开关,默认为 `true` - -## 服务器接口 - -工作流分享功能与订阅分享使用同一个服务器,服务器接口定义如下: - -```python -class WorkflowShareItem(BaseModel): - id: Optional[int] = None - share_title: Optional[str] = None - share_comment: Optional[str] = None - share_user: Optional[str] = None - share_uid: Optional[str] = None - name: Optional[str] = None - description: Optional[str] = None - timer: Optional[str] = None - actions: Optional[str] = None # JSON字符串 - flows: Optional[str] = None # JSON字符串 - context: Optional[str] = None # JSON字符串 - date: Optional[str] = None - -# 工作流分享相关接口 -@App.post("/workflow/share") -def workflow_share(workflow: WorkflowShareItem, db: Session = Depends(get_db)): - """新增工作流分享""" - -@App.delete("/workflow/share/{sid}") -def workflow_share_delete(sid: int, share_uid: str, db: Session = Depends(get_db)): - """删除工作流分享""" - -@App.get("/workflow/shares") -def workflow_shares(name: str = None, page: int = 1, count: int = 30, db: Session = Depends(get_db)): - """查询分享的工作流""" - -@App.get("/workflow/fork/{shareid}") -def workflow_fork(shareid: int, db: Session = Depends(get_db)): - """复用分享的工作流""" -``` - -## 使用说明 - -1. **启用功能**: 确保 `WORKFLOW_STATISTIC_SHARE` 配置为 `true` -2. **分享工作流**: 通过API接口分享本地工作流到公共服务器 -3. **查看分享**: 查询公共服务器上的工作流分享 -4. **复用工作流**: 将其他用户分享的工作流复制到本地使用 -5. **管理分享**: 删除自己分享的工作流 - -## 技术改进 - -1. **独立配置**: 工作流分享功能使用独立的配置开关,不再依赖订阅分享配置 -2. **数据访问层**: 使用WorkflowOper进行数据库操作,提高代码的可维护性和一致性 -3. **错误处理**: 完善的错误处理和参数验证 -4. **类型安全**: 修复了所有类型相关的linter错误 - -## 注意事项 - -1. 工作流分享功能需要网络连接才能访问公共服务器 -2. 复用的工作流默认状态为暂停,需要手动启用 -3. 工作流名称不能重复,复用时会检查本地是否存在同名工作流 -4. 分享的工作流数据以JSON字符串形式存储,包含actions、flows、context等字段 \ No newline at end of file diff --git a/app/api/endpoints/workflow.py b/app/api/endpoints/workflow.py index 9431692d..53bb7854 100644 --- a/app/api/endpoints/workflow.py +++ b/app/api/endpoints/workflow.py @@ -1,21 +1,21 @@ +import json from datetime import datetime from typing import List, Any, Optional -import json from fastapi import APIRouter, Depends from sqlalchemy.orm import Session from app import schemas +from app.chain.workflow import WorkflowChain from app.core.config import global_vars from app.core.plugin import PluginManager from app.core.workflow import WorkFlowManager from app.db import get_db -from app.db.models.workflow import Workflow +from app.db.models import Workflow from app.db.systemconfig_oper import SystemConfigOper from app.db.user_oper import get_current_active_user -from app.chain.workflow import WorkflowChain -from app.scheduler import Scheduler from app.helper.workflow import WorkflowHelper +from app.scheduler import Scheduler router = APIRouter() @@ -95,8 +95,8 @@ def update_workflow(workflow: schemas.Workflow, @router.delete("/{workflow_id}", summary="删除工作流", response_model=schemas.Response) def delete_workflow(workflow_id: int, - db: Session = Depends(get_db), - _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: + db: Session = Depends(get_db), + _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: """ 删除工作流 """ @@ -116,18 +116,18 @@ def delete_workflow(workflow_id: int, @router.post("/share", summary="分享工作流", response_model=schemas.Response) def workflow_share( - workflow_share: schemas.WorkflowShare, + workflow: schemas.WorkflowShare, _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: """ 分享工作流 """ - if not workflow_share.id or not workflow_share.share_title or not workflow_share.share_user: + if not workflow.id or not workflow.share_title or not workflow.share_user: return schemas.Response(success=False, message="请填写工作流ID、分享标题和分享人") - - state, errmsg = WorkflowHelper().workflow_share(workflow_id=workflow_share.id, - share_title=workflow_share.share_title or "", - share_comment=workflow_share.share_comment or "", - share_user=workflow_share.share_user or "") + + state, errmsg = WorkflowHelper().workflow_share(workflow_id=workflow.id, + share_title=workflow.share_title or "", + share_comment=workflow.share_comment or "", + share_user=workflow.share_user or "") return schemas.Response(success=state, message=errmsg) @@ -144,56 +144,54 @@ def workflow_share_delete( @router.post("/fork", summary="复用工作流", response_model=schemas.Response) def workflow_fork( - workflow_share: schemas.WorkflowShare, + workflow: schemas.WorkflowShare, db: Session = Depends(get_db), - current_user: schemas.User = Depends(get_current_active_user)) -> Any: + _: schemas.User = Depends(get_current_active_user)) -> Any: """ 复用工作流 """ - if not workflow_share.name: + if not workflow.name: return schemas.Response(success=False, message="工作流名称不能为空") - + # 解析JSON数据,添加错误处理 try: - actions = json.loads(workflow_share.actions or "[]") + actions = json.loads(workflow.actions or "[]") except json.JSONDecodeError: return schemas.Response(success=False, message="actions字段JSON格式错误") - + try: - flows = json.loads(workflow_share.flows or "[]") + flows = json.loads(workflow.flows or "[]") except json.JSONDecodeError: return schemas.Response(success=False, message="flows字段JSON格式错误") - + try: - context = json.loads(workflow_share.context or "{}") + context = json.loads(workflow.context or "{}") except json.JSONDecodeError: return schemas.Response(success=False, message="context字段JSON格式错误") - + # 创建工作流 workflow_dict = { - "name": workflow_share.name, - "description": workflow_share.description, - "timer": workflow_share.timer, + "name": workflow.name, + "description": workflow.description, + "timer": workflow.timer, "actions": actions, "flows": flows, "context": context, "state": "P" # 默认暂停状态 } - + # 检查名称是否重复 - from app.db.workflow_oper import WorkflowOper - if WorkflowOper(db).get_by_name(workflow_dict["name"]): + if Workflow.get_by_name(db, workflow_dict["name"]): return schemas.Response(success=False, message="已存在相同名称的工作流") - + # 创建新工作流 - from app.db.models.workflow import Workflow as WorkflowModel - workflow = WorkflowModel(**workflow_dict) + workflow = Workflow(**workflow_dict) workflow.create(db) - + # 更新复用次数 if workflow_share.id: WorkflowHelper().workflow_fork(share_id=workflow_share.id) - + return schemas.Response(success=True, message="复用成功") diff --git a/app/core/config.py b/app/core/config.py index 61035cd1..0794c3a0 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -201,8 +201,6 @@ class ConfigModel(BaseModel): SUBSCRIBE_RSS_INTERVAL: int = 30 # 订阅数据共享 SUBSCRIBE_STATISTIC_SHARE: bool = True - # 工作流数据共享 - WORKFLOW_STATISTIC_SHARE: bool = True # 订阅搜索开关 SUBSCRIBE_SEARCH: bool = False # 检查本地媒体库是否存在资源开关 @@ -313,6 +311,8 @@ class ConfigModel(BaseModel): DEFAULT_SUB: Optional[str] = "zh-cn" # Docker Client API地址 DOCKER_CLIENT_API: Optional[str] = "tcp://127.0.0.1:38379" + # 工作流数据共享 + WORKFLOW_STATISTIC_SHARE: bool = True class Settings(BaseSettings, ConfigModel, LogConfigModel): diff --git a/app/helper/workflow.py b/app/helper/workflow.py index 4cdbd5a5..db24c5a7 100644 --- a/app/helper/workflow.py +++ b/app/helper/workflow.py @@ -1,13 +1,9 @@ -from threading import Thread from typing import List, Tuple, Optional -import json from app.core.cache import cached, cache_backend from app.core.config import settings from app.db.workflow_oper import WorkflowOper -from app.db.systemconfig_oper import SystemConfigOper from app.log import logger -from app.schemas.types import SystemConfigKey from app.utils.http import RequestUtils from app.utils.singleton import WeakSingleton from app.utils.system import SystemUtils @@ -40,17 +36,12 @@ class WorkflowHelper(metaclass=WeakSingleton): return False, "当前没有开启工作流数据共享功能" # 获取工作流信息 - from app.db import get_db - db = next(get_db()) - try: - workflow = WorkflowOper(db).get(workflow_id) - if not workflow: - return False, "工作流不存在" - - workflow_dict = workflow.to_dict() - workflow_dict.pop("id") - finally: - db.close() + workflow = WorkflowOper().get(workflow_id) + if not workflow: + return False, "工作流不存在" + + workflow_dict = workflow.to_dict() + workflow_dict.pop("id") # 清除缓存 cache_backend.clear(region=self._shares_cache_region)