mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-03-20 03:57:30 +08:00
132 lines
5.0 KiB
Python
132 lines
5.0 KiB
Python
import json
|
|
from typing import List, Tuple, Optional
|
|
|
|
from app.core.cache import cached, cache_backend
|
|
from app.core.config import settings
|
|
from app.db.workflow_oper import WorkflowOper
|
|
from app.log import logger
|
|
from app.utils.http import RequestUtils
|
|
from app.utils.singleton import WeakSingleton
|
|
from app.utils.system import SystemUtils
|
|
|
|
|
|
class WorkflowHelper(metaclass=WeakSingleton):
|
|
"""
|
|
工作流分享等
|
|
"""
|
|
|
|
_workflow_share = f"{settings.MP_SERVER_HOST}/workflow/share"
|
|
|
|
_workflow_shares = f"{settings.MP_SERVER_HOST}/workflow/shares"
|
|
|
|
_workflow_fork = f"{settings.MP_SERVER_HOST}/workflow/fork/%s"
|
|
|
|
_shares_cache_region = "workflow_share"
|
|
|
|
_share_user_id = None
|
|
|
|
def __init__(self):
|
|
self.get_user_uuid()
|
|
|
|
def workflow_share(self, workflow_id: int,
|
|
share_title: str, share_comment: str, share_user: str) -> Tuple[bool, str]:
|
|
"""
|
|
分享工作流
|
|
"""
|
|
if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关
|
|
return False, "当前没有开启工作流数据共享功能"
|
|
|
|
# 获取工作流信息
|
|
workflow = WorkflowOper().get(workflow_id)
|
|
if not workflow:
|
|
return False, "工作流不存在"
|
|
|
|
if not workflow.actions or not workflow.flows:
|
|
return False, "请分享有动作和流程的工作流"
|
|
|
|
workflow_dict = workflow.to_dict()
|
|
workflow_dict.pop("id", None)
|
|
workflow_dict.pop("context", None)
|
|
workflow_dict['actions'] = json.dumps(workflow_dict['actions'] or [])
|
|
workflow_dict['flows'] = json.dumps(workflow_dict['flows'] or [])
|
|
|
|
# 发送分享请求
|
|
res = RequestUtils(proxies=settings.PROXY or {}, content_type="application/json",
|
|
timeout=10).post(self._workflow_share,
|
|
json={
|
|
"share_title": share_title,
|
|
"share_comment": share_comment,
|
|
"share_user": share_user,
|
|
"share_uid": self._share_user_id,
|
|
**workflow_dict
|
|
})
|
|
if res is None:
|
|
return False, "连接MoviePilot服务器失败"
|
|
if res.ok:
|
|
# 清除 get_shares 的缓存,以便实时看到结果
|
|
cache_backend.clear(region=self._shares_cache_region)
|
|
return True, ""
|
|
else:
|
|
return False, res.json().get("message")
|
|
|
|
def share_delete(self, share_id: int) -> Tuple[bool, str]:
|
|
"""
|
|
删除分享
|
|
"""
|
|
if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关
|
|
return False, "当前没有开启工作流数据共享功能"
|
|
|
|
res = RequestUtils(proxies=settings.PROXY or {},
|
|
timeout=5).delete_res(f"{self._workflow_share}/{share_id}",
|
|
params={"share_uid": self._share_user_id})
|
|
if res is None:
|
|
return False, "连接MoviePilot服务器失败"
|
|
if res.ok:
|
|
# 清除 get_shares 的缓存,以便实时看到结果
|
|
cache_backend.clear(region=self._shares_cache_region)
|
|
return True, ""
|
|
else:
|
|
return False, res.json().get("message")
|
|
|
|
def workflow_fork(self, share_id: int) -> Tuple[bool, str]:
|
|
"""
|
|
复用分享的工作流
|
|
"""
|
|
if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关
|
|
return False, "当前没有开启工作流数据共享功能"
|
|
|
|
res = RequestUtils(proxies=settings.PROXY or {}, timeout=5, headers={
|
|
"Content-Type": "application/json"
|
|
}).get_res(self._workflow_fork % share_id)
|
|
if res is None:
|
|
return False, "连接MoviePilot服务器失败"
|
|
if res.ok:
|
|
return True, ""
|
|
else:
|
|
return False, res.json().get("message")
|
|
|
|
@cached(region=_shares_cache_region)
|
|
def get_shares(self, name: Optional[str] = None, page: Optional[int] = 1, count: Optional[int] = 30) -> List[dict]:
|
|
"""
|
|
获取工作流分享数据
|
|
"""
|
|
if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关
|
|
return []
|
|
|
|
res = RequestUtils(proxies=settings.PROXY or {}, timeout=15).get_res(self._workflow_shares, params={
|
|
"name": name,
|
|
"page": page,
|
|
"count": count
|
|
})
|
|
if res and res.status_code == 200:
|
|
return res.json()
|
|
return []
|
|
|
|
def get_user_uuid(self) -> str:
|
|
"""
|
|
获取用户uuid
|
|
"""
|
|
if not self._share_user_id:
|
|
self._share_user_id = SystemUtils.generate_user_unique_id()
|
|
logger.info(f"当前用户UUID: {self._share_user_id}")
|
|
return self._share_user_id or "" |