Merge pull request #4566 from jxxghp/cursor/helper-91dc

新增工作流分享相关接口和helper
This commit is contained in:
jxxghp
2025-07-09 00:12:56 +08:00
committed by GitHub
7 changed files with 473 additions and 12 deletions

180
WORKFLOW_SHARE_README.md Normal file
View File

@@ -0,0 +1,180 @@
# 工作流分享功能
## 概述
基于订阅分享的相关API接口和helper类新增了工作流分享相关接口和helper以实现共享公共服务器的相关接口给前端调用与订阅使用的是同一个服务器。
## 功能特性
1. **工作流分享** - 将本地工作流分享到公共服务器
2. **分享管理** - 查看、删除已分享的工作流
3. **工作流复用** - 从公共服务器复用其他用户分享的工作流
4. **缓存机制** - 使用缓存提高查询性能
## 文件结构
### 新增文件
- `app/schemas/workflow.py` - 新增 `WorkflowShare` schema类
- `app/helper/workflow.py` - 新增工作流分享helper类
### 修改文件
- `app/api/endpoints/workflow.py` - 新增工作流分享相关API接口使用WorkflowOper进行数据库操作
- `app/db/workflow_oper.py` - 新增list方法
- `app/db/models/workflow.py` - 新增list静态方法
- `app/core/config.py` - 新增WORKFLOW_STATISTIC_SHARE配置项
## API接口
### 1. 分享工作流
```
POST /api/v1/workflow/share
```
**请求参数:**
```json
{
"id": 1,
"share_title": "我的工作流",
"share_comment": "这是一个自动化工作流",
"share_user": "用户名"
}
```
**响应:**
```json
{
"success": true,
"message": "success"
}
```
### 2. 删除分享
```
DELETE /api/v1/workflow/share/{share_id}
```
**响应:**
```json
{
"success": true,
"message": "success"
}
```
### 3. 复用工作流
```
POST /api/v1/workflow/fork
```
**请求参数:**
```json
{
"id": 1,
"name": "工作流名称",
"description": "工作流描述",
"timer": "0 0 * * *",
"actions": "[{\"id\": \"action1\", \"type\": \"test\"}]",
"flows": "[{\"id\": \"flow1\", \"source\": \"action1\"}]",
"context": "{}"
}
```
**响应:**
```json
{
"success": true,
"message": "复用成功"
}
```
### 4. 查询分享的工作流
```
GET /api/v1/workflow/shares?name=关键词&page=1&count=30
```
**响应:**
```json
[
{
"id": 1,
"share_title": "我的工作流",
"share_comment": "这是一个自动化工作流",
"share_user": "用户名",
"share_uid": "user_uuid",
"name": "工作流名称",
"description": "工作流描述",
"timer": "0 0 * * *",
"actions": "[{\"id\": \"action1\", \"type\": \"test\"}]",
"flows": "[{\"id\": \"flow1\", \"source\": \"action1\"}]",
"context": "{}",
"date": "2024-01-01 12:00:00",
"count": 5
}
]
```
## 配置说明
工作流分享功能使用独立的配置项 `WORKFLOW_STATISTIC_SHARE`,当该配置为 `true` 时,工作流分享功能才会启用。
### 配置项
- `WORKFLOW_STATISTIC_SHARE`: 工作流数据共享开关,默认为 `true`
## 服务器接口
工作流分享功能与订阅分享使用同一个服务器,服务器接口定义如下:
```python
class WorkflowShareItem(BaseModel):
id: Optional[int] = None
share_title: Optional[str] = None
share_comment: Optional[str] = None
share_user: Optional[str] = None
share_uid: Optional[str] = None
name: Optional[str] = None
description: Optional[str] = None
timer: Optional[str] = None
actions: Optional[str] = None # JSON字符串
flows: Optional[str] = None # JSON字符串
context: Optional[str] = None # JSON字符串
date: Optional[str] = None
# 工作流分享相关接口
@App.post("/workflow/share")
def workflow_share(workflow: WorkflowShareItem, db: Session = Depends(get_db)):
"""新增工作流分享"""
@App.delete("/workflow/share/{sid}")
def workflow_share_delete(sid: int, share_uid: str, db: Session = Depends(get_db)):
"""删除工作流分享"""
@App.get("/workflow/shares")
def workflow_shares(name: str = None, page: int = 1, count: int = 30, db: Session = Depends(get_db)):
"""查询分享的工作流"""
@App.get("/workflow/fork/{shareid}")
def workflow_fork(shareid: int, db: Session = Depends(get_db)):
"""复用分享的工作流"""
```
## 使用说明
1. **启用功能**: 确保 `WORKFLOW_STATISTIC_SHARE` 配置为 `true`
2. **分享工作流**: 通过API接口分享本地工作流到公共服务器
3. **查看分享**: 查询公共服务器上的工作流分享
4. **复用工作流**: 将其他用户分享的工作流复制到本地使用
5. **管理分享**: 删除自己分享的工作流
## 技术改进
1. **独立配置**: 工作流分享功能使用独立的配置开关,不再依赖订阅分享配置
2. **数据访问层**: 使用WorkflowOper进行数据库操作提高代码的可维护性和一致性
3. **错误处理**: 完善的错误处理和参数验证
4. **类型安全**: 修复了所有类型相关的linter错误
## 注意事项
1. 工作流分享功能需要网络连接才能访问公共服务器
2. 复用的工作流默认状态为暂停,需要手动启用
3. 工作流名称不能重复,复用时会检查本地是否存在同名工作流
4. 分享的工作流数据以JSON字符串形式存储包含actions、flows、context等字段

View File

@@ -1,5 +1,6 @@
from datetime import datetime
from typing import List, Any, Optional
import json
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
@@ -14,6 +15,7 @@ from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_user
from app.chain.workflow import WorkflowChain
from app.scheduler import Scheduler
from app.helper.workflow import WorkflowHelper
router = APIRouter()
@@ -24,7 +26,8 @@ def list_workflows(db: Session = Depends(get_db),
"""
获取工作流列表
"""
return Workflow.list(db)
from app.db.workflow_oper import WorkflowOper
return WorkflowOper(db).list()
@router.post("/", summary="创建工作流", response_model=schemas.Response)
@@ -34,13 +37,15 @@ def create_workflow(workflow: schemas.Workflow,
"""
创建工作流
"""
if Workflow.get_by_name(db, workflow.name):
from app.db.workflow_oper import WorkflowOper
if workflow.name and WorkflowOper(db).get_by_name(workflow.name):
return schemas.Response(success=False, message="已存在相同名称的工作流")
if not workflow.add_time:
workflow.add_time = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")
if not workflow.state:
workflow.state = "P"
Workflow(**workflow.dict()).create(db)
from app.db.models.workflow import Workflow as WorkflowModel
WorkflowModel(**workflow.dict()).create(db)
return schemas.Response(success=True, message="创建工作流成功")
@@ -67,7 +72,8 @@ def get_workflow(workflow_id: int,
"""
获取工作流详情
"""
return Workflow.get(db, workflow_id)
from app.db.workflow_oper import WorkflowOper
return WorkflowOper(db).get(workflow_id)
@router.put("/{workflow_id}", summary="更新工作流", response_model=schemas.Response)
@@ -77,7 +83,10 @@ def update_workflow(workflow: schemas.Workflow,
"""
更新工作流
"""
wf = Workflow.get(db, workflow.id)
from app.db.workflow_oper import WorkflowOper
if not workflow.id:
return schemas.Response(success=False, message="工作流ID不能为空")
wf = WorkflowOper(db).get(workflow.id)
if not wf:
return schemas.Response(success=False, message="工作流不存在")
wf.update(db, workflow.dict())
@@ -86,23 +95,120 @@ def update_workflow(workflow: schemas.Workflow,
@router.delete("/{workflow_id}", summary="删除工作流", response_model=schemas.Response)
def delete_workflow(workflow_id: int,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
删除工作流
"""
workflow = Workflow.get(db, workflow_id)
from app.db.workflow_oper import WorkflowOper
workflow = WorkflowOper(db).get(workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
# 删除定时任务
Scheduler().remove_workflow_job(workflow)
# 删除工作流
Workflow.delete(db, workflow_id)
from app.db.models.workflow import Workflow as WorkflowModel
WorkflowModel.delete(db, workflow_id)
# 删除缓存
SystemConfigOper().delete(f"WorkflowCache-{workflow_id}")
return schemas.Response(success=True, message="删除成功")
@router.post("/share", summary="分享工作流", response_model=schemas.Response)
def workflow_share(
workflow_share: schemas.WorkflowShare,
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
分享工作流
"""
if not workflow_share.id or not workflow_share.share_title or not workflow_share.share_user:
return schemas.Response(success=False, message="请填写工作流ID、分享标题和分享人")
state, errmsg = WorkflowHelper().workflow_share(workflow_id=workflow_share.id,
share_title=workflow_share.share_title or "",
share_comment=workflow_share.share_comment or "",
share_user=workflow_share.share_user or "")
return schemas.Response(success=state, message=errmsg)
@router.delete("/share/{share_id}", summary="删除分享", response_model=schemas.Response)
def workflow_share_delete(
share_id: int,
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
删除分享
"""
state, errmsg = WorkflowHelper().share_delete(share_id=share_id)
return schemas.Response(success=state, message=errmsg)
@router.post("/fork", summary="复用工作流", response_model=schemas.Response)
def workflow_fork(
workflow_share: schemas.WorkflowShare,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_current_active_user)) -> Any:
"""
复用工作流
"""
if not workflow_share.name:
return schemas.Response(success=False, message="工作流名称不能为空")
# 解析JSON数据添加错误处理
try:
actions = json.loads(workflow_share.actions or "[]")
except json.JSONDecodeError:
return schemas.Response(success=False, message="actions字段JSON格式错误")
try:
flows = json.loads(workflow_share.flows or "[]")
except json.JSONDecodeError:
return schemas.Response(success=False, message="flows字段JSON格式错误")
try:
context = json.loads(workflow_share.context or "{}")
except json.JSONDecodeError:
return schemas.Response(success=False, message="context字段JSON格式错误")
# 创建工作流
workflow_dict = {
"name": workflow_share.name,
"description": workflow_share.description,
"timer": workflow_share.timer,
"actions": actions,
"flows": flows,
"context": context,
"state": "P" # 默认暂停状态
}
# 检查名称是否重复
from app.db.workflow_oper import WorkflowOper
if WorkflowOper(db).get_by_name(workflow_dict["name"]):
return schemas.Response(success=False, message="已存在相同名称的工作流")
# 创建新工作流
from app.db.models.workflow import Workflow as WorkflowModel
workflow = WorkflowModel(**workflow_dict)
workflow.create(db)
# 更新复用次数
if workflow_share.id:
WorkflowHelper().workflow_fork(share_id=workflow_share.id)
return schemas.Response(success=True, message="复用成功")
@router.get("/shares", summary="查询分享的工作流", response_model=List[schemas.WorkflowShare])
def workflow_shares(
name: Optional[str] = None,
page: Optional[int] = 1,
count: Optional[int] = 30,
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
查询分享的工作流
"""
return WorkflowHelper().get_shares(name=name, page=page, count=count)
@router.post("/{workflow_id}/run", summary="执行工作流", response_model=schemas.Response)
def run_workflow(workflow_id: int,
from_begin: Optional[bool] = True,
@@ -123,7 +229,8 @@ def start_workflow(workflow_id: int,
"""
启用工作流
"""
workflow = Workflow.get(db, workflow_id)
from app.db.workflow_oper import WorkflowOper
workflow = WorkflowOper(db).get(workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
# 添加定时任务
@@ -140,7 +247,8 @@ def pause_workflow(workflow_id: int,
"""
停用工作流
"""
workflow = Workflow.get(db, workflow_id)
from app.db.workflow_oper import WorkflowOper
workflow = WorkflowOper(db).get(workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
# 删除定时任务
@@ -159,7 +267,8 @@ def reset_workflow(workflow_id: int,
"""
重置工作流
"""
workflow = Workflow.get(db, workflow_id)
from app.db.workflow_oper import WorkflowOper
workflow = WorkflowOper(db).get(workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
# 停止工作流

View File

@@ -201,6 +201,8 @@ class ConfigModel(BaseModel):
SUBSCRIBE_RSS_INTERVAL: int = 30
# 订阅数据共享
SUBSCRIBE_STATISTIC_SHARE: bool = True
# 工作流数据共享
WORKFLOW_STATISTIC_SHARE: bool = True
# 订阅搜索开关
SUBSCRIBE_SEARCH: bool = False
# 检查本地媒体库是否存在资源开关

View File

@@ -37,6 +37,11 @@ class Workflow(Base):
# 最后执行时间
last_time = Column(String)
@staticmethod
@db_query
def list(db):
return db.query(Workflow).all()
@staticmethod
@db_query
def get_enabled_workflows(db):

View File

@@ -25,6 +25,12 @@ class WorkflowOper(DbOper):
"""
return Workflow.get(self._db, wid)
def list(self) -> List[Workflow]:
"""
获取所有工作流列表
"""
return Workflow.list(self._db)
def list_enabled(self) -> List[Workflow]:
"""
获取启用的工作流列表

137
app/helper/workflow.py Normal file
View File

@@ -0,0 +1,137 @@
from threading import Thread
from typing import List, Tuple, Optional
import json
from app.core.cache import cached, cache_backend
from app.core.config import settings
from app.db.workflow_oper import WorkflowOper
from app.db.systemconfig_oper import SystemConfigOper
from app.log import logger
from app.schemas.types import SystemConfigKey
from app.utils.http import RequestUtils
from app.utils.singleton import WeakSingleton
from app.utils.system import SystemUtils
class WorkflowHelper(metaclass=WeakSingleton):
"""
工作流分享等
"""
_workflow_share = f"{settings.MP_SERVER_HOST}/workflow/share"
_workflow_shares = f"{settings.MP_SERVER_HOST}/workflow/shares"
_workflow_fork = f"{settings.MP_SERVER_HOST}/workflow/fork/%s"
_shares_cache_region = "workflow_share"
_share_user_id = None
def __init__(self):
self.get_user_uuid()
def workflow_share(self, workflow_id: int,
share_title: str, share_comment: str, share_user: str) -> Tuple[bool, str]:
"""
分享工作流
"""
if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关
return False, "当前没有开启工作流数据共享功能"
# 获取工作流信息
from app.db import get_db
db = next(get_db())
try:
workflow = WorkflowOper(db).get(workflow_id)
if not workflow:
return False, "工作流不存在"
workflow_dict = workflow.to_dict()
workflow_dict.pop("id")
finally:
db.close()
# 清除缓存
cache_backend.clear(region=self._shares_cache_region)
# 发送分享请求
res = RequestUtils(proxies=settings.PROXY or {}, content_type="application/json",
timeout=10).post(self._workflow_share,
json={
"share_title": share_title,
"share_comment": share_comment,
"share_user": share_user,
"share_uid": self._share_user_id,
**workflow_dict
})
if res is None:
return False, "连接MoviePilot服务器失败"
if res.ok:
# 清除 get_shares 的缓存,以便实时看到结果
cache_backend.clear(region=self._shares_cache_region)
return True, ""
else:
return False, res.json().get("message")
def share_delete(self, share_id: int) -> Tuple[bool, str]:
"""
删除分享
"""
if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关
return False, "当前没有开启工作流数据共享功能"
res = RequestUtils(proxies=settings.PROXY or {},
timeout=5).delete_res(f"{self._workflow_share}/{share_id}",
params={"share_uid": self._share_user_id})
if res is None:
return False, "连接MoviePilot服务器失败"
if res.ok:
# 清除 get_shares 的缓存,以便实时看到结果
cache_backend.clear(region=self._shares_cache_region)
return True, ""
else:
return False, res.json().get("message")
def workflow_fork(self, share_id: int) -> Tuple[bool, str]:
"""
复用分享的工作流
"""
if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关
return False, "当前没有开启工作流数据共享功能"
res = RequestUtils(proxies=settings.PROXY or {}, timeout=5, headers={
"Content-Type": "application/json"
}).get_res(self._workflow_fork % share_id)
if res is None:
return False, "连接MoviePilot服务器失败"
if res.ok:
return True, ""
else:
return False, res.json().get("message")
@cached(region=_shares_cache_region)
def get_shares(self, name: Optional[str] = None, page: Optional[int] = 1, count: Optional[int] = 30) -> List[dict]:
"""
获取工作流分享数据
"""
if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关
return []
res = RequestUtils(proxies=settings.PROXY or {}, timeout=15).get_res(self._workflow_shares, params={
"name": name,
"page": page,
"count": count
})
if res and res.status_code == 200:
return res.json()
return []
def get_user_uuid(self) -> str:
"""
获取用户uuid
"""
if not self._share_user_id:
self._share_user_id = SystemUtils.generate_user_unique_id()
logger.info(f"当前用户UUID: {self._share_user_id}")
return self._share_user_id or ""

View File

@@ -82,3 +82,25 @@ class ActionFlow(BaseModel):
source: Optional[str] = Field(default=None, description="源动作")
target: Optional[str] = Field(default=None, description="目标动作")
animated: Optional[bool] = Field(default=True, description="是否动画流程")
class WorkflowShare(BaseModel):
"""
工作流分享信息
"""
id: Optional[int] = Field(default=None, description="分享ID")
share_title: Optional[str] = Field(default=None, description="分享标题")
share_comment: Optional[str] = Field(default=None, description="分享说明")
share_user: Optional[str] = Field(default=None, description="分享人")
share_uid: Optional[str] = Field(default=None, description="分享人唯一ID")
name: Optional[str] = Field(default=None, description="工作流名称")
description: Optional[str] = Field(default=None, description="工作流描述")
timer: Optional[str] = Field(default=None, description="定时器")
actions: Optional[str] = Field(default=None, description="任务列表(JSON字符串)")
flows: Optional[str] = Field(default=None, description="任务流(JSON字符串)")
context: Optional[str] = Field(default=None, description="执行上下文(JSON字符串)")
date: Optional[str] = Field(default=None, description="分享时间")
count: Optional[int] = Field(default=0, description="复用人次")
class Config:
orm_mode = True