diff --git a/app/api/endpoints/workflow.py b/app/api/endpoints/workflow.py index 4bfc8d2e..c0e44bc1 100644 --- a/app/api/endpoints/workflow.py +++ b/app/api/endpoints/workflow.py @@ -1,5 +1,6 @@ from datetime import datetime from typing import List, Any, Optional +import json from fastapi import APIRouter, Depends from sqlalchemy.orm import Session @@ -14,6 +15,7 @@ from app.db.systemconfig_oper import SystemConfigOper from app.db.user_oper import get_current_active_user from app.chain.workflow import WorkflowChain from app.scheduler import Scheduler +from app.helper.workflow import WorkflowHelper router = APIRouter() @@ -86,8 +88,8 @@ def update_workflow(workflow: schemas.Workflow, @router.delete("/{workflow_id}", summary="删除工作流", response_model=schemas.Response) def delete_workflow(workflow_id: int, - db: Session = Depends(get_db), - _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: + db: Session = Depends(get_db), + _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: """ 删除工作流 """ @@ -103,6 +105,92 @@ def delete_workflow(workflow_id: int, return schemas.Response(success=True, message="删除成功") +@router.post("/share", summary="分享工作流", response_model=schemas.Response) +def workflow_share( + workflow_share: schemas.WorkflowShare, + _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: + """ + 分享工作流 + """ + if not workflow_share.id or not workflow_share.share_title or not workflow_share.share_user: + return schemas.Response(success=False, message="请填写工作流ID、分享标题和分享人") + + state, errmsg = WorkflowHelper().workflow_share(workflow_id=workflow_share.id, + share_title=workflow_share.share_title or "", + share_comment=workflow_share.share_comment or "", + share_user=workflow_share.share_user or "") + return schemas.Response(success=state, message=errmsg) + + +@router.delete("/share/{share_id}", summary="删除分享", response_model=schemas.Response) +def workflow_share_delete( + share_id: int, + _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: + """ + 删除分享 + """ + state, errmsg = WorkflowHelper().share_delete(share_id=share_id) + return schemas.Response(success=state, message=errmsg) + + +@router.post("/fork", summary="复用工作流", response_model=schemas.Response) +def workflow_fork( + workflow_share: schemas.WorkflowShare, + current_user: schemas.User = Depends(get_current_active_user)) -> Any: + """ + 复用工作流 + """ + # 获取分享的工作流数据 + shares = WorkflowHelper().get_shares() + target_share = None + for share in shares: + if share.get("id") == workflow_share.id: + target_share = share + break + + if not target_share: + return schemas.Response(success=False, message="分享的工作流不存在") + + # 创建工作流 + workflow_dict = { + "name": target_share.get("name"), + "description": target_share.get("description"), + "timer": target_share.get("timer"), + "actions": json.loads(target_share.get("actions", "[]")), + "flows": json.loads(target_share.get("flows", "[]")), + "context": json.loads(target_share.get("context", "{}")), + "state": "P" # 默认暂停状态 + } + + # 检查名称是否重复 + db = next(get_db()) + if Workflow.get_by_name(db, workflow_dict["name"]): + return schemas.Response(success=False, message="已存在相同名称的工作流") + + # 创建新工作流 + from app.db.models.workflow import Workflow as WorkflowModel + workflow = WorkflowModel(**workflow_dict) + workflow.create(db) + + # 更新复用次数 + if workflow_share.id: + WorkflowHelper().workflow_fork(share_id=workflow_share.id) + + return schemas.Response(success=True, message="复用成功") + + +@router.get("/shares", summary="查询分享的工作流", response_model=List[schemas.WorkflowShare]) +def workflow_shares( + name: Optional[str] = None, + page: Optional[int] = 1, + count: Optional[int] = 30, + _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: + """ + 查询分享的工作流 + """ + return WorkflowHelper().get_shares(name=name, page=page, count=count) + + @router.post("/{workflow_id}/run", summary="执行工作流", response_model=schemas.Response) def run_workflow(workflow_id: int, from_begin: Optional[bool] = True, diff --git a/app/helper/workflow.py b/app/helper/workflow.py new file mode 100644 index 00000000..fd427a47 --- /dev/null +++ b/app/helper/workflow.py @@ -0,0 +1,134 @@ +from threading import Thread +from typing import List, Tuple, Optional +import json + +from app.core.cache import cached, cache_backend +from app.core.config import settings +from app.db.models.workflow import Workflow as WorkflowModel +from app.db.systemconfig_oper import SystemConfigOper +from app.log import logger +from app.schemas.types import SystemConfigKey +from app.utils.http import RequestUtils +from app.utils.singleton import WeakSingleton +from app.utils.system import SystemUtils + + +class WorkflowHelper(metaclass=WeakSingleton): + """ + 工作流分享等 + """ + + _workflow_share = f"{settings.MP_SERVER_HOST}/workflow/share" + + _workflow_shares = f"{settings.MP_SERVER_HOST}/workflow/shares" + + _workflow_fork = f"{settings.MP_SERVER_HOST}/workflow/fork/%s" + + _shares_cache_region = "workflow_share" + + _share_user_id = None + + def __init__(self): + self.get_user_uuid() + + def workflow_share(self, workflow_id: int, + share_title: str, share_comment: str, share_user: str) -> Tuple[bool, str]: + """ + 分享工作流 + """ + if not settings.SUBSCRIBE_STATISTIC_SHARE: # 复用订阅分享的配置 + return False, "当前没有开启数据共享功能" + + # 获取工作流信息 + from app.db import get_db + db = next(get_db()) + workflow = WorkflowModel.get(db, workflow_id) + if not workflow: + return False, "工作流不存在" + + workflow_dict = workflow.to_dict() + workflow_dict.pop("id") + + # 清除缓存 + cache_backend.clear(region=self._shares_cache_region) + + # 发送分享请求 + res = RequestUtils(proxies=settings.PROXY or {}, content_type="application/json", + timeout=10).post(self._workflow_share, + json={ + "share_title": share_title, + "share_comment": share_comment, + "share_user": share_user, + "share_uid": self._share_user_id, + **workflow_dict + }) + if res is None: + return False, "连接MoviePilot服务器失败" + if res.ok: + # 清除 get_shares 的缓存,以便实时看到结果 + cache_backend.clear(region=self._shares_cache_region) + return True, "" + else: + return False, res.json().get("message") + + def share_delete(self, share_id: int) -> Tuple[bool, str]: + """ + 删除分享 + """ + if not settings.SUBSCRIBE_STATISTIC_SHARE: # 复用订阅分享的配置 + return False, "当前没有开启数据共享功能" + + res = RequestUtils(proxies=settings.PROXY or {}, + timeout=5).delete_res(f"{self._workflow_share}/{share_id}", + params={"share_uid": self._share_user_id}) + if res is None: + return False, "连接MoviePilot服务器失败" + if res.ok: + # 清除 get_shares 的缓存,以便实时看到结果 + cache_backend.clear(region=self._shares_cache_region) + return True, "" + else: + return False, res.json().get("message") + + def workflow_fork(self, share_id: int) -> Tuple[bool, str]: + """ + 复用分享的工作流 + """ + if not settings.SUBSCRIBE_STATISTIC_SHARE: # 复用订阅分享的配置 + return False, "当前没有开启数据共享功能" + + res = RequestUtils(proxies=settings.PROXY or {}, timeout=5, headers={ + "Content-Type": "application/json" + }).get_res(self._workflow_fork % share_id) + if res is None: + return False, "连接MoviePilot服务器失败" + if res.ok: + return True, "" + else: + return False, res.json().get("message") + + @cached(region=_shares_cache_region) + def get_shares(self, name: Optional[str] = None, page: Optional[int] = 1, count: Optional[int] = 30) -> List[dict]: + """ + 获取工作流分享数据 + """ + if not settings.SUBSCRIBE_STATISTIC_SHARE: # 复用订阅分享的配置 + return [] + + res = RequestUtils(proxies=settings.PROXY or {}, timeout=15).get_res(self._workflow_shares, params={ + "name": name, + "page": page, + "count": count + }) + if res and res.status_code == 200: + return res.json() + return [] + + def get_user_uuid(self) -> str: + """ + 获取用户uuid + """ + if not self._share_user_id: + self._share_user_id = SystemUtils.generate_user_unique_id() + logger.info(f"当前用户UUID: {self._share_user_id}") + return self._share_user_id or "" \ No newline at end of file diff --git a/app/schemas/workflow.py b/app/schemas/workflow.py index 854d4d9e..ff7b041b 100644 --- a/app/schemas/workflow.py +++ b/app/schemas/workflow.py @@ -82,3 +82,25 @@ class ActionFlow(BaseModel): source: Optional[str] = Field(default=None, description="源动作") target: Optional[str] = Field(default=None, description="目标动作") animated: Optional[bool] = Field(default=True, description="是否动画流程") + + +class WorkflowShare(BaseModel): + """ + 工作流分享信息 + """ + id: Optional[int] = Field(default=None, description="分享ID") + share_title: Optional[str] = Field(default=None, description="分享标题") + share_comment: Optional[str] = Field(default=None, description="分享说明") + share_user: Optional[str] = Field(default=None, description="分享人") + share_uid: Optional[str] = Field(default=None, description="分享人唯一ID") + name: Optional[str] = Field(default=None, description="工作流名称") + description: Optional[str] = Field(default=None, description="工作流描述") + timer: Optional[str] = Field(default=None, description="定时器") + actions: Optional[str] = Field(default=None, description="任务列表(JSON字符串)") + flows: Optional[str] = Field(default=None, description="任务流(JSON字符串)") + context: Optional[str] = Field(default=None, description="执行上下文(JSON字符串)") + date: Optional[str] = Field(default=None, description="分享时间") + count: Optional[int] = Field(default=0, description="复用人次") + + class Config: + orm_mode = True