mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-04-05 03:38:36 +08:00
fix subscribe helper
This commit is contained in:
@@ -60,43 +60,111 @@ class SubscribeHelper(metaclass=WeakSingleton):
|
||||
self.get_user_uuid()
|
||||
self.get_github_user()
|
||||
|
||||
@staticmethod
|
||||
def _check_subscribe_share_enabled() -> Tuple[bool, str]:
|
||||
"""
|
||||
检查订阅分享功能是否开启
|
||||
"""
|
||||
if not settings.SUBSCRIBE_STATISTIC_SHARE:
|
||||
return False, "当前没有开启订阅数据共享功能"
|
||||
return True, ""
|
||||
|
||||
@staticmethod
|
||||
def _validate_subscribe(subscribe) -> Tuple[bool, str]:
|
||||
"""
|
||||
验证订阅是否存在
|
||||
"""
|
||||
if not subscribe:
|
||||
return False, "订阅不存在"
|
||||
return True, ""
|
||||
|
||||
@staticmethod
|
||||
def _prepare_subscribe_data(subscribe) -> dict:
|
||||
"""
|
||||
准备订阅分享数据
|
||||
"""
|
||||
subscribe_dict = subscribe.to_dict()
|
||||
subscribe_dict.pop("id", None)
|
||||
return subscribe_dict
|
||||
|
||||
def _build_share_payload(self, share_title: str, share_comment: str,
|
||||
share_user: str, subscribe_dict: dict) -> dict:
|
||||
"""
|
||||
构建分享请求载荷
|
||||
"""
|
||||
return {
|
||||
"share_title": share_title,
|
||||
"share_comment": share_comment,
|
||||
"share_user": share_user,
|
||||
"share_uid": self._share_user_id,
|
||||
**subscribe_dict
|
||||
}
|
||||
|
||||
def _handle_response(self, res, clear_cache: bool = True) -> Tuple[bool, str]:
|
||||
"""
|
||||
处理HTTP响应
|
||||
"""
|
||||
if res is None:
|
||||
return False, "连接MoviePilot服务器失败"
|
||||
|
||||
# 检查响应状态
|
||||
if res and res.status_code == 200:
|
||||
# 清除缓存
|
||||
if clear_cache:
|
||||
cache_backend.clear(region=self._shares_cache_region)
|
||||
return True, ""
|
||||
else:
|
||||
return False, res.json().get("message")
|
||||
|
||||
@staticmethod
|
||||
def _handle_list_response(res) -> List[dict]:
|
||||
"""
|
||||
处理返回List的HTTP响应
|
||||
"""
|
||||
if res and res.status_code == 200:
|
||||
return res.json()
|
||||
return []
|
||||
|
||||
@cached(region=_shares_cache_region, maxsize=5, ttl=1800, skip_empty=True)
|
||||
def get_statistic(self, stype: str, page: Optional[int] = 1, count: Optional[int] = 30) -> List[dict]:
|
||||
"""
|
||||
获取订阅统计数据
|
||||
"""
|
||||
if not settings.SUBSCRIBE_STATISTIC_SHARE:
|
||||
enabled, _ = self._check_subscribe_share_enabled()
|
||||
if not enabled:
|
||||
return []
|
||||
|
||||
res = RequestUtils(proxies=settings.PROXY, timeout=15).get_res(self._sub_statistic, params={
|
||||
"stype": stype,
|
||||
"page": page,
|
||||
"count": count
|
||||
})
|
||||
if res and res.status_code == 200:
|
||||
return res.json()
|
||||
return []
|
||||
|
||||
return self._handle_list_response(res)
|
||||
|
||||
@cached(region=_shares_cache_region, maxsize=5, ttl=1800, skip_empty=True)
|
||||
async def async_get_statistic(self, stype: str, page: Optional[int] = 1, count: Optional[int] = 30) -> List[dict]:
|
||||
"""
|
||||
异步获取订阅统计数据
|
||||
"""
|
||||
if not settings.SUBSCRIBE_STATISTIC_SHARE:
|
||||
enabled, _ = self._check_subscribe_share_enabled()
|
||||
if not enabled:
|
||||
return []
|
||||
|
||||
res = await AsyncRequestUtils(proxies=settings.PROXY, timeout=15).get_res(self._sub_statistic, params={
|
||||
"stype": stype,
|
||||
"page": page,
|
||||
"count": count
|
||||
})
|
||||
if res and res.status_code == 200:
|
||||
return res.json()
|
||||
return []
|
||||
|
||||
return self._handle_list_response(res)
|
||||
|
||||
def sub_reg(self, sub: dict) -> bool:
|
||||
"""
|
||||
新增订阅统计
|
||||
"""
|
||||
if not settings.SUBSCRIBE_STATISTIC_SHARE:
|
||||
enabled, _ = self._check_subscribe_share_enabled()
|
||||
if not enabled:
|
||||
return False
|
||||
res = RequestUtils(proxies=settings.PROXY, timeout=5, headers={
|
||||
"Content-Type": "application/json"
|
||||
@@ -109,7 +177,8 @@ class SubscribeHelper(metaclass=WeakSingleton):
|
||||
"""
|
||||
完成订阅统计
|
||||
"""
|
||||
if not settings.SUBSCRIBE_STATISTIC_SHARE:
|
||||
enabled, _ = self._check_subscribe_share_enabled()
|
||||
if not enabled:
|
||||
return False
|
||||
res = RequestUtils(proxies=settings.PROXY, timeout=5, headers={
|
||||
"Content-Type": "application/json"
|
||||
@@ -138,7 +207,8 @@ class SubscribeHelper(metaclass=WeakSingleton):
|
||||
"""
|
||||
上报存量订阅统计
|
||||
"""
|
||||
if not settings.SUBSCRIBE_STATISTIC_SHARE:
|
||||
enabled, _ = self._check_subscribe_share_enabled()
|
||||
if not enabled:
|
||||
return False
|
||||
subscribes = SubscribeOper().list()
|
||||
if not subscribes:
|
||||
@@ -157,146 +227,133 @@ class SubscribeHelper(metaclass=WeakSingleton):
|
||||
"""
|
||||
分享订阅
|
||||
"""
|
||||
if not settings.SUBSCRIBE_STATISTIC_SHARE:
|
||||
return False, "当前没有开启订阅数据共享功能"
|
||||
# 检查功能是否开启
|
||||
enabled, message = self._check_subscribe_share_enabled()
|
||||
if not enabled:
|
||||
return False, message
|
||||
|
||||
# 获取订阅信息
|
||||
subscribe = SubscribeOper().get(subscribe_id)
|
||||
if not subscribe:
|
||||
return False, "订阅不存在"
|
||||
subscribe_dict = subscribe.to_dict()
|
||||
subscribe_dict.pop("id")
|
||||
cache_backend.clear(region=self._shares_cache_region)
|
||||
|
||||
# 验证订阅
|
||||
valid, message = self._validate_subscribe(subscribe)
|
||||
if not valid:
|
||||
return False, message
|
||||
|
||||
# 准备数据
|
||||
subscribe_dict = self._prepare_subscribe_data(subscribe)
|
||||
payload = self._build_share_payload(share_title, share_comment, share_user, subscribe_dict)
|
||||
|
||||
# 发送分享请求
|
||||
res = RequestUtils(proxies=settings.PROXY, content_type="application/json",
|
||||
timeout=10).post(self._sub_share,
|
||||
json={
|
||||
"share_title": share_title,
|
||||
"share_comment": share_comment,
|
||||
"share_user": share_user,
|
||||
"share_uid": self._share_user_id,
|
||||
**subscribe_dict
|
||||
})
|
||||
if res is None:
|
||||
return False, "连接MoviePilot服务器失败"
|
||||
if res.ok:
|
||||
# 清除 get_shares 的缓存,以便实时看到结果
|
||||
cache_backend.clear(region=self._shares_cache_region)
|
||||
return True, ""
|
||||
else:
|
||||
return False, res.json().get("message")
|
||||
timeout=10).post(self._sub_share, json=payload)
|
||||
|
||||
return self._handle_response(res)
|
||||
|
||||
async def async_sub_share(self, subscribe_id: int,
|
||||
share_title: str, share_comment: str, share_user: str) -> Tuple[bool, str]:
|
||||
"""
|
||||
异步分享订阅
|
||||
"""
|
||||
if not settings.SUBSCRIBE_STATISTIC_SHARE:
|
||||
return False, "当前没有开启订阅数据共享功能"
|
||||
# 检查功能是否开启
|
||||
enabled, message = self._check_subscribe_share_enabled()
|
||||
if not enabled:
|
||||
return False, message
|
||||
|
||||
# 获取订阅信息
|
||||
subscribe = await SubscribeOper().async_get(subscribe_id)
|
||||
if not subscribe:
|
||||
return False, "订阅不存在"
|
||||
subscribe_dict = subscribe.to_dict()
|
||||
subscribe_dict.pop("id")
|
||||
cache_backend.clear(region=self._shares_cache_region)
|
||||
|
||||
# 验证订阅
|
||||
valid, message = self._validate_subscribe(subscribe)
|
||||
if not valid:
|
||||
return False, message
|
||||
|
||||
# 准备数据
|
||||
subscribe_dict = self._prepare_subscribe_data(subscribe)
|
||||
payload = self._build_share_payload(share_title, share_comment, share_user, subscribe_dict)
|
||||
|
||||
# 发送分享请求
|
||||
res = await AsyncRequestUtils(proxies=settings.PROXY, content_type="application/json",
|
||||
timeout=10).post(self._sub_share,
|
||||
json={
|
||||
"share_title": share_title,
|
||||
"share_comment": share_comment,
|
||||
"share_user": share_user,
|
||||
"share_uid": self._share_user_id,
|
||||
**subscribe_dict
|
||||
})
|
||||
if res is None:
|
||||
return False, "连接MoviePilot服务器失败"
|
||||
if res.status_code == 200:
|
||||
# 清除 get_shares 的缓存,以便实时看到结果
|
||||
cache_backend.clear(region=self._shares_cache_region)
|
||||
return True, ""
|
||||
else:
|
||||
return False, res.json().get("message")
|
||||
timeout=10).post(self._sub_share, json=payload)
|
||||
|
||||
return self._handle_response(res)
|
||||
|
||||
def share_delete(self, share_id: int) -> Tuple[bool, str]:
|
||||
"""
|
||||
删除分享
|
||||
"""
|
||||
if not settings.SUBSCRIBE_STATISTIC_SHARE:
|
||||
return False, "当前没有开启订阅数据共享功能"
|
||||
# 检查功能是否开启
|
||||
enabled, message = self._check_subscribe_share_enabled()
|
||||
if not enabled:
|
||||
return False, message
|
||||
|
||||
res = RequestUtils(proxies=settings.PROXY,
|
||||
timeout=5).delete_res(f"{self._sub_share}/{share_id}",
|
||||
params={"share_uid": self._share_user_id})
|
||||
if res is None:
|
||||
return False, "连接MoviePilot服务器失败"
|
||||
if res.ok:
|
||||
# 清除 get_shares 的缓存,以便实时看到结果
|
||||
cache_backend.clear(region=self._shares_cache_region)
|
||||
return True, ""
|
||||
else:
|
||||
return False, res.json().get("message")
|
||||
|
||||
return self._handle_response(res)
|
||||
|
||||
async def async_share_delete(self, share_id: int) -> Tuple[bool, str]:
|
||||
"""
|
||||
异步删除分享
|
||||
"""
|
||||
if not settings.SUBSCRIBE_STATISTIC_SHARE:
|
||||
return False, "当前没有开启订阅数据共享功能"
|
||||
# 检查功能是否开启
|
||||
enabled, message = self._check_subscribe_share_enabled()
|
||||
if not enabled:
|
||||
return False, message
|
||||
|
||||
res = await AsyncRequestUtils(proxies=settings.PROXY,
|
||||
timeout=5).delete_res(f"{self._sub_share}/{share_id}",
|
||||
params={"share_uid": self._share_user_id})
|
||||
if res is None:
|
||||
return False, "连接MoviePilot服务器失败"
|
||||
if res.status_code == 200:
|
||||
# 清除 get_shares 的缓存,以便实时看到结果
|
||||
cache_backend.clear(region=self._shares_cache_region)
|
||||
return True, ""
|
||||
else:
|
||||
return False, res.json().get("message")
|
||||
|
||||
return self._handle_response(res)
|
||||
|
||||
def sub_fork(self, share_id: int) -> Tuple[bool, str]:
|
||||
"""
|
||||
复用分享的订阅
|
||||
"""
|
||||
if not settings.SUBSCRIBE_STATISTIC_SHARE:
|
||||
return False, "当前没有开启订阅数据共享功能"
|
||||
# 检查功能是否开启
|
||||
enabled, message = self._check_subscribe_share_enabled()
|
||||
if not enabled:
|
||||
return False, message
|
||||
|
||||
res = RequestUtils(proxies=settings.PROXY, timeout=5, headers={
|
||||
"Content-Type": "application/json"
|
||||
}).get_res(self._sub_fork % share_id)
|
||||
if res is None:
|
||||
return False, "连接MoviePilot服务器失败"
|
||||
if res.ok:
|
||||
return True, ""
|
||||
else:
|
||||
return False, res.json().get("message")
|
||||
|
||||
return self._handle_response(res, clear_cache=False)
|
||||
|
||||
async def async_sub_fork(self, share_id: int) -> Tuple[bool, str]:
|
||||
"""
|
||||
异步复用分享的订阅
|
||||
"""
|
||||
if not settings.SUBSCRIBE_STATISTIC_SHARE:
|
||||
return False, "当前没有开启订阅数据共享功能"
|
||||
# 检查功能是否开启
|
||||
enabled, message = self._check_subscribe_share_enabled()
|
||||
if not enabled:
|
||||
return False, message
|
||||
|
||||
res = await AsyncRequestUtils(proxies=settings.PROXY, timeout=5, headers={
|
||||
"Content-Type": "application/json"
|
||||
}).get_res(self._sub_fork % share_id)
|
||||
if res is None:
|
||||
return False, "连接MoviePilot服务器失败"
|
||||
if res.status_code == 200:
|
||||
return True, ""
|
||||
else:
|
||||
return False, res.json().get("message")
|
||||
|
||||
return self._handle_response(res, clear_cache=False)
|
||||
|
||||
@cached(region=_shares_cache_region, maxsize=1, ttl=1800, skip_empty=True)
|
||||
def get_shares(self, name: Optional[str] = None, page: Optional[int] = 1, count: Optional[int] = 30) -> List[dict]:
|
||||
"""
|
||||
获取订阅分享数据
|
||||
"""
|
||||
if not settings.SUBSCRIBE_STATISTIC_SHARE:
|
||||
enabled, _ = self._check_subscribe_share_enabled()
|
||||
if not enabled:
|
||||
return []
|
||||
|
||||
res = RequestUtils(proxies=settings.PROXY, timeout=15).get_res(self._sub_shares, params={
|
||||
"name": name,
|
||||
"page": page,
|
||||
"count": count
|
||||
})
|
||||
if res and res.status_code == 200:
|
||||
return res.json()
|
||||
return []
|
||||
|
||||
return self._handle_list_response(res)
|
||||
|
||||
@cached(region=_shares_cache_region, maxsize=1, ttl=1800, skip_empty=True)
|
||||
async def async_get_shares(self, name: Optional[str] = None, page: Optional[int] = 1, count: Optional[int] = 30) -> \
|
||||
@@ -304,40 +361,43 @@ class SubscribeHelper(metaclass=WeakSingleton):
|
||||
"""
|
||||
异步获取订阅分享数据
|
||||
"""
|
||||
if not settings.SUBSCRIBE_STATISTIC_SHARE:
|
||||
enabled, _ = self._check_subscribe_share_enabled()
|
||||
if not enabled:
|
||||
return []
|
||||
|
||||
res = await AsyncRequestUtils(proxies=settings.PROXY, timeout=15).get_res(self._sub_shares, params={
|
||||
"name": name,
|
||||
"page": page,
|
||||
"count": count
|
||||
})
|
||||
if res and res.status_code == 200:
|
||||
return res.json()
|
||||
return []
|
||||
|
||||
return self._handle_list_response(res)
|
||||
|
||||
@cached(region=_shares_cache_region, maxsize=1, ttl=1800, skip_empty=True)
|
||||
def get_share_statistics(self) -> List[dict]:
|
||||
"""
|
||||
获取订阅分享统计数据
|
||||
"""
|
||||
if not settings.SUBSCRIBE_STATISTIC_SHARE:
|
||||
enabled, _ = self._check_subscribe_share_enabled()
|
||||
if not enabled:
|
||||
return []
|
||||
|
||||
res = RequestUtils(proxies=settings.PROXY, timeout=15).get_res(self._sub_share_statistic)
|
||||
if res and res.status_code == 200:
|
||||
return res.json()
|
||||
return []
|
||||
|
||||
return self._handle_list_response(res)
|
||||
|
||||
@cached(region=_shares_cache_region, maxsize=1, ttl=1800, skip_empty=True)
|
||||
async def async_get_share_statistics(self) -> List[dict]:
|
||||
"""
|
||||
异步获取订阅分享统计数据
|
||||
"""
|
||||
if not settings.SUBSCRIBE_STATISTIC_SHARE:
|
||||
enabled, _ = self._check_subscribe_share_enabled()
|
||||
if not enabled:
|
||||
return []
|
||||
|
||||
res = await AsyncRequestUtils(proxies=settings.PROXY, timeout=15).get_res(self._sub_share_statistic)
|
||||
if res and res.status_code == 200:
|
||||
return res.json()
|
||||
return []
|
||||
|
||||
return self._handle_list_response(res)
|
||||
|
||||
def get_user_uuid(self) -> str:
|
||||
"""
|
||||
|
||||
@@ -3,12 +3,12 @@ from typing import List, Tuple, Optional
|
||||
|
||||
from app.core.cache import cached, cache_backend
|
||||
from app.core.config import settings
|
||||
from app.db.models import Workflow
|
||||
from app.db.workflow_oper import WorkflowOper
|
||||
from app.log import logger
|
||||
from app.utils.http import RequestUtils, AsyncRequestUtils
|
||||
from app.utils.singleton import WeakSingleton
|
||||
from app.utils.system import SystemUtils
|
||||
from db.models import Workflow
|
||||
|
||||
|
||||
class WorkflowHelper(metaclass=WeakSingleton):
|
||||
@@ -94,6 +94,15 @@ class WorkflowHelper(metaclass=WeakSingleton):
|
||||
else:
|
||||
return False, res.json().get("message")
|
||||
|
||||
@staticmethod
|
||||
def _handle_list_response(res) -> List[dict]:
|
||||
"""
|
||||
处理返回List的HTTP响应
|
||||
"""
|
||||
if res and res.status_code == 200:
|
||||
return res.json()
|
||||
return []
|
||||
|
||||
def workflow_share(self, workflow_id: int,
|
||||
share_title: str, share_comment: str, share_user: str) -> Tuple[bool, str]:
|
||||
"""
|
||||
@@ -228,9 +237,7 @@ class WorkflowHelper(metaclass=WeakSingleton):
|
||||
"page": page,
|
||||
"count": count
|
||||
})
|
||||
if res and res.status_code == 200:
|
||||
return res.json()
|
||||
return []
|
||||
return self._handle_list_response(res)
|
||||
|
||||
@cached(region=_shares_cache_region, maxsize=1, skip_empty=True)
|
||||
async def async_get_shares(self, name: Optional[str] = None, page: Optional[int] = 1, count: Optional[int] = 30) -> \
|
||||
@@ -247,9 +254,7 @@ class WorkflowHelper(metaclass=WeakSingleton):
|
||||
"page": page,
|
||||
"count": count
|
||||
})
|
||||
if res and res.status_code == 200:
|
||||
return res.json()
|
||||
return []
|
||||
return self._handle_list_response(res)
|
||||
|
||||
def get_user_uuid(self) -> str:
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user