Add workflow sharing, forking, and related API endpoints

Co-authored-by: jxxghp <jxxghp@163.com>
This commit is contained in:
Cursor Agent
2025-07-08 15:18:01 +00:00
parent 9da9d765a0
commit bbffb1420b
3 changed files with 246 additions and 2 deletions

View File

@@ -1,5 +1,6 @@
from datetime import datetime
from typing import List, Any, Optional
import json
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
@@ -14,6 +15,7 @@ from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_user
from app.chain.workflow import WorkflowChain
from app.scheduler import Scheduler
from app.helper.workflow import WorkflowHelper
router = APIRouter()
@@ -86,8 +88,8 @@ def update_workflow(workflow: schemas.Workflow,
@router.delete("/{workflow_id}", summary="删除工作流", response_model=schemas.Response)
def delete_workflow(workflow_id: int,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
删除工作流
"""
@@ -103,6 +105,92 @@ def delete_workflow(workflow_id: int,
return schemas.Response(success=True, message="删除成功")
@router.post("/share", summary="分享工作流", response_model=schemas.Response)
def workflow_share(
workflow_share: schemas.WorkflowShare,
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
分享工作流
"""
if not workflow_share.id or not workflow_share.share_title or not workflow_share.share_user:
return schemas.Response(success=False, message="请填写工作流ID、分享标题和分享人")
state, errmsg = WorkflowHelper().workflow_share(workflow_id=workflow_share.id,
share_title=workflow_share.share_title or "",
share_comment=workflow_share.share_comment or "",
share_user=workflow_share.share_user or "")
return schemas.Response(success=state, message=errmsg)
@router.delete("/share/{share_id}", summary="删除分享", response_model=schemas.Response)
def workflow_share_delete(
share_id: int,
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
删除分享
"""
state, errmsg = WorkflowHelper().share_delete(share_id=share_id)
return schemas.Response(success=state, message=errmsg)
@router.post("/fork", summary="复用工作流", response_model=schemas.Response)
def workflow_fork(
workflow_share: schemas.WorkflowShare,
current_user: schemas.User = Depends(get_current_active_user)) -> Any:
"""
复用工作流
"""
# 获取分享的工作流数据
shares = WorkflowHelper().get_shares()
target_share = None
for share in shares:
if share.get("id") == workflow_share.id:
target_share = share
break
if not target_share:
return schemas.Response(success=False, message="分享的工作流不存在")
# 创建工作流
workflow_dict = {
"name": target_share.get("name"),
"description": target_share.get("description"),
"timer": target_share.get("timer"),
"actions": json.loads(target_share.get("actions", "[]")),
"flows": json.loads(target_share.get("flows", "[]")),
"context": json.loads(target_share.get("context", "{}")),
"state": "P" # 默认暂停状态
}
# 检查名称是否重复
db = next(get_db())
if Workflow.get_by_name(db, workflow_dict["name"]):
return schemas.Response(success=False, message="已存在相同名称的工作流")
# 创建新工作流
from app.db.models.workflow import Workflow as WorkflowModel
workflow = WorkflowModel(**workflow_dict)
workflow.create(db)
# 更新复用次数
if workflow_share.id:
WorkflowHelper().workflow_fork(share_id=workflow_share.id)
return schemas.Response(success=True, message="复用成功")
@router.get("/shares", summary="查询分享的工作流", response_model=List[schemas.WorkflowShare])
def workflow_shares(
name: Optional[str] = None,
page: Optional[int] = 1,
count: Optional[int] = 30,
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
查询分享的工作流
"""
return WorkflowHelper().get_shares(name=name, page=page, count=count)
@router.post("/{workflow_id}/run", summary="执行工作流", response_model=schemas.Response)
def run_workflow(workflow_id: int,
from_begin: Optional[bool] = True,

134
app/helper/workflow.py Normal file
View File

@@ -0,0 +1,134 @@
from threading import Thread
from typing import List, Tuple, Optional
import json
from app.core.cache import cached, cache_backend
from app.core.config import settings
from app.db.models.workflow import Workflow as WorkflowModel
from app.db.systemconfig_oper import SystemConfigOper
from app.log import logger
from app.schemas.types import SystemConfigKey
from app.utils.http import RequestUtils
from app.utils.singleton import WeakSingleton
from app.utils.system import SystemUtils
class WorkflowHelper(metaclass=WeakSingleton):
"""
工作流分享等
"""
_workflow_share = f"{settings.MP_SERVER_HOST}/workflow/share"
_workflow_shares = f"{settings.MP_SERVER_HOST}/workflow/shares"
_workflow_fork = f"{settings.MP_SERVER_HOST}/workflow/fork/%s"
_shares_cache_region = "workflow_share"
_share_user_id = None
def __init__(self):
self.get_user_uuid()
def workflow_share(self, workflow_id: int,
share_title: str, share_comment: str, share_user: str) -> Tuple[bool, str]:
"""
分享工作流
"""
if not settings.SUBSCRIBE_STATISTIC_SHARE: # 复用订阅分享的配置
return False, "当前没有开启数据共享功能"
# 获取工作流信息
from app.db import get_db
db = next(get_db())
workflow = WorkflowModel.get(db, workflow_id)
if not workflow:
return False, "工作流不存在"
workflow_dict = workflow.to_dict()
workflow_dict.pop("id")
# 清除缓存
cache_backend.clear(region=self._shares_cache_region)
# 发送分享请求
res = RequestUtils(proxies=settings.PROXY or {}, content_type="application/json",
timeout=10).post(self._workflow_share,
json={
"share_title": share_title,
"share_comment": share_comment,
"share_user": share_user,
"share_uid": self._share_user_id,
**workflow_dict
})
if res is None:
return False, "连接MoviePilot服务器失败"
if res.ok:
# 清除 get_shares 的缓存,以便实时看到结果
cache_backend.clear(region=self._shares_cache_region)
return True, ""
else:
return False, res.json().get("message")
def share_delete(self, share_id: int) -> Tuple[bool, str]:
"""
删除分享
"""
if not settings.SUBSCRIBE_STATISTIC_SHARE: # 复用订阅分享的配置
return False, "当前没有开启数据共享功能"
res = RequestUtils(proxies=settings.PROXY or {},
timeout=5).delete_res(f"{self._workflow_share}/{share_id}",
params={"share_uid": self._share_user_id})
if res is None:
return False, "连接MoviePilot服务器失败"
if res.ok:
# 清除 get_shares 的缓存,以便实时看到结果
cache_backend.clear(region=self._shares_cache_region)
return True, ""
else:
return False, res.json().get("message")
def workflow_fork(self, share_id: int) -> Tuple[bool, str]:
"""
复用分享的工作流
"""
if not settings.SUBSCRIBE_STATISTIC_SHARE: # 复用订阅分享的配置
return False, "当前没有开启数据共享功能"
res = RequestUtils(proxies=settings.PROXY or {}, timeout=5, headers={
"Content-Type": "application/json"
}).get_res(self._workflow_fork % share_id)
if res is None:
return False, "连接MoviePilot服务器失败"
if res.ok:
return True, ""
else:
return False, res.json().get("message")
@cached(region=_shares_cache_region)
def get_shares(self, name: Optional[str] = None, page: Optional[int] = 1, count: Optional[int] = 30) -> List[dict]:
"""
获取工作流分享数据
"""
if not settings.SUBSCRIBE_STATISTIC_SHARE: # 复用订阅分享的配置
return []
res = RequestUtils(proxies=settings.PROXY or {}, timeout=15).get_res(self._workflow_shares, params={
"name": name,
"page": page,
"count": count
})
if res and res.status_code == 200:
return res.json()
return []
def get_user_uuid(self) -> str:
"""
获取用户uuid
"""
if not self._share_user_id:
self._share_user_id = SystemUtils.generate_user_unique_id()
logger.info(f"当前用户UUID: {self._share_user_id}")
return self._share_user_id or ""

View File

@@ -82,3 +82,25 @@ class ActionFlow(BaseModel):
source: Optional[str] = Field(default=None, description="源动作")
target: Optional[str] = Field(default=None, description="目标动作")
animated: Optional[bool] = Field(default=True, description="是否动画流程")
class WorkflowShare(BaseModel):
"""
工作流分享信息
"""
id: Optional[int] = Field(default=None, description="分享ID")
share_title: Optional[str] = Field(default=None, description="分享标题")
share_comment: Optional[str] = Field(default=None, description="分享说明")
share_user: Optional[str] = Field(default=None, description="分享人")
share_uid: Optional[str] = Field(default=None, description="分享人唯一ID")
name: Optional[str] = Field(default=None, description="工作流名称")
description: Optional[str] = Field(default=None, description="工作流描述")
timer: Optional[str] = Field(default=None, description="定时器")
actions: Optional[str] = Field(default=None, description="任务列表(JSON字符串)")
flows: Optional[str] = Field(default=None, description="任务流(JSON字符串)")
context: Optional[str] = Field(default=None, description="执行上下文(JSON字符串)")
date: Optional[str] = Field(default=None, description="分享时间")
count: Optional[int] = Field(default=0, description="复用人次")
class Config:
orm_mode = True