fix workflow

This commit is contained in:
jxxghp
2025-07-09 00:19:47 +08:00
parent 9271ee833c
commit 2a3ea8315d
4 changed files with 40 additions and 231 deletions

View File

@@ -1,180 +0,0 @@
# 工作流分享功能
## 概述
基于订阅分享的相关API接口和helper类新增了工作流分享相关接口和helper以实现共享公共服务器的相关接口给前端调用与订阅使用的是同一个服务器。
## 功能特性
1. **工作流分享** - 将本地工作流分享到公共服务器
2. **分享管理** - 查看、删除已分享的工作流
3. **工作流复用** - 从公共服务器复用其他用户分享的工作流
4. **缓存机制** - 使用缓存提高查询性能
## 文件结构
### 新增文件
- `app/schemas/workflow.py` - 新增 `WorkflowShare` schema类
- `app/helper/workflow.py` - 新增工作流分享helper类
### 修改文件
- `app/api/endpoints/workflow.py` - 新增工作流分享相关API接口使用WorkflowOper进行数据库操作
- `app/db/workflow_oper.py` - 新增list方法
- `app/db/models/workflow.py` - 新增list静态方法
- `app/core/config.py` - 新增WORKFLOW_STATISTIC_SHARE配置项
## API接口
### 1. 分享工作流
```
POST /api/v1/workflow/share
```
**请求参数:**
```json
{
"id": 1,
"share_title": "我的工作流",
"share_comment": "这是一个自动化工作流",
"share_user": "用户名"
}
```
**响应:**
```json
{
"success": true,
"message": "success"
}
```
### 2. 删除分享
```
DELETE /api/v1/workflow/share/{share_id}
```
**响应:**
```json
{
"success": true,
"message": "success"
}
```
### 3. 复用工作流
```
POST /api/v1/workflow/fork
```
**请求参数:**
```json
{
"id": 1,
"name": "工作流名称",
"description": "工作流描述",
"timer": "0 0 * * *",
"actions": "[{\"id\": \"action1\", \"type\": \"test\"}]",
"flows": "[{\"id\": \"flow1\", \"source\": \"action1\"}]",
"context": "{}"
}
```
**响应:**
```json
{
"success": true,
"message": "复用成功"
}
```
### 4. 查询分享的工作流
```
GET /api/v1/workflow/shares?name=关键词&page=1&count=30
```
**响应:**
```json
[
{
"id": 1,
"share_title": "我的工作流",
"share_comment": "这是一个自动化工作流",
"share_user": "用户名",
"share_uid": "user_uuid",
"name": "工作流名称",
"description": "工作流描述",
"timer": "0 0 * * *",
"actions": "[{\"id\": \"action1\", \"type\": \"test\"}]",
"flows": "[{\"id\": \"flow1\", \"source\": \"action1\"}]",
"context": "{}",
"date": "2024-01-01 12:00:00",
"count": 5
}
]
```
## 配置说明
工作流分享功能使用独立的配置项 `WORKFLOW_STATISTIC_SHARE`,当该配置为 `true` 时,工作流分享功能才会启用。
### 配置项
- `WORKFLOW_STATISTIC_SHARE`: 工作流数据共享开关,默认为 `true`
## 服务器接口
工作流分享功能与订阅分享使用同一个服务器,服务器接口定义如下:
```python
class WorkflowShareItem(BaseModel):
id: Optional[int] = None
share_title: Optional[str] = None
share_comment: Optional[str] = None
share_user: Optional[str] = None
share_uid: Optional[str] = None
name: Optional[str] = None
description: Optional[str] = None
timer: Optional[str] = None
actions: Optional[str] = None # JSON字符串
flows: Optional[str] = None # JSON字符串
context: Optional[str] = None # JSON字符串
date: Optional[str] = None
# 工作流分享相关接口
@App.post("/workflow/share")
def workflow_share(workflow: WorkflowShareItem, db: Session = Depends(get_db)):
"""新增工作流分享"""
@App.delete("/workflow/share/{sid}")
def workflow_share_delete(sid: int, share_uid: str, db: Session = Depends(get_db)):
"""删除工作流分享"""
@App.get("/workflow/shares")
def workflow_shares(name: str = None, page: int = 1, count: int = 30, db: Session = Depends(get_db)):
"""查询分享的工作流"""
@App.get("/workflow/fork/{shareid}")
def workflow_fork(shareid: int, db: Session = Depends(get_db)):
"""复用分享的工作流"""
```
## 使用说明
1. **启用功能**: 确保 `WORKFLOW_STATISTIC_SHARE` 配置为 `true`
2. **分享工作流**: 通过API接口分享本地工作流到公共服务器
3. **查看分享**: 查询公共服务器上的工作流分享
4. **复用工作流**: 将其他用户分享的工作流复制到本地使用
5. **管理分享**: 删除自己分享的工作流
## 技术改进
1. **独立配置**: 工作流分享功能使用独立的配置开关,不再依赖订阅分享配置
2. **数据访问层**: 使用WorkflowOper进行数据库操作提高代码的可维护性和一致性
3. **错误处理**: 完善的错误处理和参数验证
4. **类型安全**: 修复了所有类型相关的linter错误
## 注意事项
1. 工作流分享功能需要网络连接才能访问公共服务器
2. 复用的工作流默认状态为暂停,需要手动启用
3. 工作流名称不能重复,复用时会检查本地是否存在同名工作流
4. 分享的工作流数据以JSON字符串形式存储包含actions、flows、context等字段

View File

@@ -1,21 +1,21 @@
import json
from datetime import datetime
from typing import List, Any, Optional
import json
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app import schemas
from app.chain.workflow import WorkflowChain
from app.core.config import global_vars
from app.core.plugin import PluginManager
from app.core.workflow import WorkFlowManager
from app.db import get_db
from app.db.models.workflow import Workflow
from app.db.models import Workflow
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_user
from app.chain.workflow import WorkflowChain
from app.scheduler import Scheduler
from app.helper.workflow import WorkflowHelper
from app.scheduler import Scheduler
router = APIRouter()
@@ -95,8 +95,8 @@ def update_workflow(workflow: schemas.Workflow,
@router.delete("/{workflow_id}", summary="删除工作流", response_model=schemas.Response)
def delete_workflow(workflow_id: int,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
删除工作流
"""
@@ -116,18 +116,18 @@ def delete_workflow(workflow_id: int,
@router.post("/share", summary="分享工作流", response_model=schemas.Response)
def workflow_share(
workflow_share: schemas.WorkflowShare,
workflow: schemas.WorkflowShare,
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
分享工作流
"""
if not workflow_share.id or not workflow_share.share_title or not workflow_share.share_user:
if not workflow.id or not workflow.share_title or not workflow.share_user:
return schemas.Response(success=False, message="请填写工作流ID、分享标题和分享人")
state, errmsg = WorkflowHelper().workflow_share(workflow_id=workflow_share.id,
share_title=workflow_share.share_title or "",
share_comment=workflow_share.share_comment or "",
share_user=workflow_share.share_user or "")
state, errmsg = WorkflowHelper().workflow_share(workflow_id=workflow.id,
share_title=workflow.share_title or "",
share_comment=workflow.share_comment or "",
share_user=workflow.share_user or "")
return schemas.Response(success=state, message=errmsg)
@@ -144,56 +144,54 @@ def workflow_share_delete(
@router.post("/fork", summary="复用工作流", response_model=schemas.Response)
def workflow_fork(
workflow_share: schemas.WorkflowShare,
workflow: schemas.WorkflowShare,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_current_active_user)) -> Any:
_: schemas.User = Depends(get_current_active_user)) -> Any:
"""
复用工作流
"""
if not workflow_share.name:
if not workflow.name:
return schemas.Response(success=False, message="工作流名称不能为空")
# 解析JSON数据添加错误处理
try:
actions = json.loads(workflow_share.actions or "[]")
actions = json.loads(workflow.actions or "[]")
except json.JSONDecodeError:
return schemas.Response(success=False, message="actions字段JSON格式错误")
try:
flows = json.loads(workflow_share.flows or "[]")
flows = json.loads(workflow.flows or "[]")
except json.JSONDecodeError:
return schemas.Response(success=False, message="flows字段JSON格式错误")
try:
context = json.loads(workflow_share.context or "{}")
context = json.loads(workflow.context or "{}")
except json.JSONDecodeError:
return schemas.Response(success=False, message="context字段JSON格式错误")
# 创建工作流
workflow_dict = {
"name": workflow_share.name,
"description": workflow_share.description,
"timer": workflow_share.timer,
"name": workflow.name,
"description": workflow.description,
"timer": workflow.timer,
"actions": actions,
"flows": flows,
"context": context,
"state": "P" # 默认暂停状态
}
# 检查名称是否重复
from app.db.workflow_oper import WorkflowOper
if WorkflowOper(db).get_by_name(workflow_dict["name"]):
if Workflow.get_by_name(db, workflow_dict["name"]):
return schemas.Response(success=False, message="已存在相同名称的工作流")
# 创建新工作流
from app.db.models.workflow import Workflow as WorkflowModel
workflow = WorkflowModel(**workflow_dict)
workflow = Workflow(**workflow_dict)
workflow.create(db)
# 更新复用次数
if workflow_share.id:
WorkflowHelper().workflow_fork(share_id=workflow_share.id)
return schemas.Response(success=True, message="复用成功")

View File

@@ -201,8 +201,6 @@ class ConfigModel(BaseModel):
SUBSCRIBE_RSS_INTERVAL: int = 30
# 订阅数据共享
SUBSCRIBE_STATISTIC_SHARE: bool = True
# 工作流数据共享
WORKFLOW_STATISTIC_SHARE: bool = True
# 订阅搜索开关
SUBSCRIBE_SEARCH: bool = False
# 检查本地媒体库是否存在资源开关
@@ -313,6 +311,8 @@ class ConfigModel(BaseModel):
DEFAULT_SUB: Optional[str] = "zh-cn"
# Docker Client API地址
DOCKER_CLIENT_API: Optional[str] = "tcp://127.0.0.1:38379"
# 工作流数据共享
WORKFLOW_STATISTIC_SHARE: bool = True
class Settings(BaseSettings, ConfigModel, LogConfigModel):

View File

@@ -1,13 +1,9 @@
from threading import Thread
from typing import List, Tuple, Optional
import json
from app.core.cache import cached, cache_backend
from app.core.config import settings
from app.db.workflow_oper import WorkflowOper
from app.db.systemconfig_oper import SystemConfigOper
from app.log import logger
from app.schemas.types import SystemConfigKey
from app.utils.http import RequestUtils
from app.utils.singleton import WeakSingleton
from app.utils.system import SystemUtils
@@ -40,17 +36,12 @@ class WorkflowHelper(metaclass=WeakSingleton):
return False, "当前没有开启工作流数据共享功能"
# 获取工作流信息
from app.db import get_db
db = next(get_db())
try:
workflow = WorkflowOper(db).get(workflow_id)
if not workflow:
return False, "工作流不存在"
workflow_dict = workflow.to_dict()
workflow_dict.pop("id")
finally:
db.close()
workflow = WorkflowOper().get(workflow_id)
if not workflow:
return False, "工作流不存在"
workflow_dict = workflow.to_dict()
workflow_dict.pop("id")
# 清除缓存
cache_backend.clear(region=self._shares_cache_region)