From d8f10e9ac40ee812125a547162f27bb120487710 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Thu, 31 Jul 2025 07:17:05 +0800 Subject: [PATCH] fix workflow helper --- app/helper/workflow.py | 204 +++++++++++++++++++++++------------------ 1 file changed, 117 insertions(+), 87 deletions(-) diff --git a/app/helper/workflow.py b/app/helper/workflow.py index c889f1f5..0bf5d6ef 100644 --- a/app/helper/workflow.py +++ b/app/helper/workflow.py @@ -8,6 +8,7 @@ from app.log import logger from app.utils.http import RequestUtils, AsyncRequestUtils from app.utils.singleton import WeakSingleton from app.utils.system import SystemUtils +from db.models import Workflow class WorkflowHelper(metaclass=WeakSingleton): @@ -28,170 +29,198 @@ class WorkflowHelper(metaclass=WeakSingleton): def __init__(self): self.get_user_uuid() - def workflow_share(self, workflow_id: int, - share_title: str, share_comment: str, share_user: str) -> Tuple[bool, str]: + @staticmethod + def _check_workflow_share_enabled() -> Tuple[bool, str]: """ - 分享工作流 + 检查工作流分享功能是否开启 """ - if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关 + if not settings.WORKFLOW_STATISTIC_SHARE: return False, "当前没有开启工作流数据共享功能" + return True, "" - # 获取工作流信息 - workflow = WorkflowOper().get(workflow_id) + @staticmethod + def _validate_workflow(workflow: Workflow) -> Tuple[bool, str]: + """ + 验证工作流是否可以分享 + """ if not workflow: return False, "工作流不存在" if not workflow.actions or not workflow.flows: return False, "请分享有动作和流程的工作流" + return True, "" + + @staticmethod + def _prepare_workflow_data(workflow: Workflow) -> dict: + """ + 准备工作流分享数据 + """ workflow_dict = workflow.to_dict() workflow_dict.pop("id", None) workflow_dict.pop("context", None) workflow_dict['actions'] = json.dumps(workflow_dict['actions'] or []) workflow_dict['flows'] = json.dumps(workflow_dict['flows'] or []) + return workflow_dict + + def _build_share_payload(self, share_title: str, share_comment: str, + share_user: str, workflow_dict: dict) -> dict: + """ + 构建分享请求载荷 + """ + return { + "share_title": share_title, + "share_comment": share_comment, + "share_user": share_user, + "share_uid": self._share_user_id, + **workflow_dict + } + + def _handle_response(self, res, clear_cache: bool = True) -> Tuple[bool, str]: + """ + 处理HTTP响应 + """ + if res is None: + return False, "连接MoviePilot服务器失败" + + # 检查响应状态 + success = True if res.status_code == 200 else False + + if success: + # 清除缓存 + if clear_cache: + cache_backend.clear(region=self._shares_cache_region) + return True, "" + else: + return False, res.json().get("message") + + def workflow_share(self, workflow_id: int, + share_title: str, share_comment: str, share_user: str) -> Tuple[bool, str]: + """ + 分享工作流 + """ + # 检查功能是否开启 + enabled, message = self._check_workflow_share_enabled() + if not enabled: + return False, message + + # 获取工作流信息 + workflow = WorkflowOper().get(workflow_id) + + # 验证工作流 + valid, message = self._validate_workflow(workflow) + if not valid: + return False, message + + # 准备数据 + workflow_dict = self._prepare_workflow_data(workflow) + payload = self._build_share_payload(share_title, share_comment, share_user, workflow_dict) # 发送分享请求 res = RequestUtils(proxies=settings.PROXY or {}, content_type="application/json", - timeout=10).post(self._workflow_share, - json={ - "share_title": share_title, - "share_comment": share_comment, - "share_user": share_user, - "share_uid": self._share_user_id, - **workflow_dict - }) - if res is None: - return False, "连接MoviePilot服务器失败" - if res.ok: - # 清除 get_shares 的缓存,以便实时看到结果 - cache_backend.clear(region=self._shares_cache_region) - return True, "" - else: - return False, res.json().get("message") + timeout=10).post(self._workflow_share, json=payload) + + return self._handle_response(res) async def async_workflow_share(self, workflow_id: int, share_title: str, share_comment: str, share_user: str) -> Tuple[bool, str]: """ 异步分享工作流 """ - if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关 - return False, "当前没有开启工作流数据共享功能" + # 检查功能是否开启 + enabled, message = self._check_workflow_share_enabled() + if not enabled: + return False, message # 获取工作流信息 workflow = await WorkflowOper().async_get(workflow_id) - if not workflow: - return False, "工作流不存在" - if not workflow.actions or not workflow.flows: - return False, "请分享有动作和流程的工作流" + # 验证工作流 + valid, message = self._validate_workflow(workflow) + if not valid: + return False, message - workflow_dict = workflow.to_dict() - workflow_dict.pop("id", None) - workflow_dict.pop("context", None) - workflow_dict['actions'] = json.dumps(workflow_dict['actions'] or []) - workflow_dict['flows'] = json.dumps(workflow_dict['flows'] or []) + # 准备数据 + workflow_dict = self._prepare_workflow_data(workflow) + payload = self._build_share_payload(share_title, share_comment, share_user, workflow_dict) # 发送分享请求 res = await AsyncRequestUtils(proxies=settings.PROXY or {}, content_type="application/json", - timeout=10).post(self._workflow_share, - json={ - "share_title": share_title, - "share_comment": share_comment, - "share_user": share_user, - "share_uid": self._share_user_id, - **workflow_dict - }) - if res is None: - return False, "连接MoviePilot服务器失败" - if res.status_code == 200: - # 清除 get_shares 的缓存,以便实时看到结果 - cache_backend.clear(region=self._shares_cache_region) - return True, "" - else: - return False, res.json().get("message") + timeout=10).post(self._workflow_share, json=payload) + + return self._handle_response(res) def share_delete(self, share_id: int) -> Tuple[bool, str]: """ 删除分享 """ - if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关 - return False, "当前没有开启工作流数据共享功能" + # 检查功能是否开启 + enabled, message = self._check_workflow_share_enabled() + if not enabled: + return False, message res = RequestUtils(proxies=settings.PROXY or {}, timeout=5).delete_res(f"{self._workflow_share}/{share_id}", params={"share_uid": self._share_user_id}) - if res is None: - return False, "连接MoviePilot服务器失败" - if res.ok: - # 清除 get_shares 的缓存,以便实时看到结果 - cache_backend.clear(region=self._shares_cache_region) - return True, "" - else: - return False, res.json().get("message") + + return self._handle_response(res) async def async_share_delete(self, share_id: int) -> Tuple[bool, str]: """ 异步删除分享 """ - if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关 - return False, "当前没有开启工作流数据共享功能" + # 检查功能是否开启 + enabled, message = self._check_workflow_share_enabled() + if not enabled: + return False, message res = await AsyncRequestUtils(proxies=settings.PROXY or {}, timeout=5).delete_res(f"{self._workflow_share}/{share_id}", params={"share_uid": self._share_user_id}) - if res is None: - return False, "连接MoviePilot服务器失败" - if res.status_code == 200: - # 清除 get_shares 的缓存,以便实时看到结果 - cache_backend.clear(region=self._shares_cache_region) - return True, "" - else: - return False, res.json().get("message") + + return self._handle_response(res) def workflow_fork(self, share_id: int) -> Tuple[bool, str]: """ 复用分享的工作流 """ - if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关 - return False, "当前没有开启工作流数据共享功能" + # 检查功能是否开启 + enabled, message = self._check_workflow_share_enabled() + if not enabled: + return False, message res = RequestUtils(proxies=settings.PROXY or {}, timeout=5, headers={ "Content-Type": "application/json" }).get_res(self._workflow_fork % share_id) - if res is None: - return False, "连接MoviePilot服务器失败" - if res.ok: - return True, "" - else: - return False, res.json().get("message") + + return self._handle_response(res, clear_cache=False) async def async_workflow_fork(self, share_id: int) -> Tuple[bool, str]: """ 异步复用分享的工作流 """ - if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关 - return False, "当前没有开启工作流数据共享功能" + # 检查功能是否开启 + enabled, message = self._check_workflow_share_enabled() + if not enabled: + return False, message res = await AsyncRequestUtils(proxies=settings.PROXY or {}, timeout=5, headers={ "Content-Type": "application/json" }).get_res(self._workflow_fork % share_id) - if res is None: - return False, "连接MoviePilot服务器失败" - if res.status_code == 200: - return True, "" - else: - return False, res.json().get("message") + + return self._handle_response(res, clear_cache=False) @cached(region=_shares_cache_region, maxsize=1, skip_empty=True) def get_shares(self, name: Optional[str] = None, page: Optional[int] = 1, count: Optional[int] = 30) -> List[dict]: """ 获取工作流分享数据 """ - if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关 + enabled, _ = self._check_workflow_share_enabled() + if not enabled: return [] res = RequestUtils(proxies=settings.PROXY or {}, timeout=15).get_res(self._workflow_shares, params={ @@ -209,7 +238,8 @@ class WorkflowHelper(metaclass=WeakSingleton): """ 异步获取工作流分享数据 """ - if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关 + enabled, _ = self._check_workflow_share_enabled() + if not enabled: return [] res = await AsyncRequestUtils(proxies=settings.PROXY or {}, timeout=15).get_res(self._workflow_shares, params={