From d382eab355ce486527cff69c4455777c673a4e87 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Thu, 31 Jul 2025 07:26:58 +0800 Subject: [PATCH] fix subscribe helper --- app/helper/subscribe.py | 278 ++++++++++++++++++++++++---------------- app/helper/workflow.py | 19 ++- 2 files changed, 181 insertions(+), 116 deletions(-) diff --git a/app/helper/subscribe.py b/app/helper/subscribe.py index d0c21bfe..d8b1d06a 100644 --- a/app/helper/subscribe.py +++ b/app/helper/subscribe.py @@ -60,43 +60,111 @@ class SubscribeHelper(metaclass=WeakSingleton): self.get_user_uuid() self.get_github_user() + @staticmethod + def _check_subscribe_share_enabled() -> Tuple[bool, str]: + """ + 检查订阅分享功能是否开启 + """ + if not settings.SUBSCRIBE_STATISTIC_SHARE: + return False, "当前没有开启订阅数据共享功能" + return True, "" + + @staticmethod + def _validate_subscribe(subscribe) -> Tuple[bool, str]: + """ + 验证订阅是否存在 + """ + if not subscribe: + return False, "订阅不存在" + return True, "" + + @staticmethod + def _prepare_subscribe_data(subscribe) -> dict: + """ + 准备订阅分享数据 + """ + subscribe_dict = subscribe.to_dict() + subscribe_dict.pop("id", None) + return subscribe_dict + + def _build_share_payload(self, share_title: str, share_comment: str, + share_user: str, subscribe_dict: dict) -> dict: + """ + 构建分享请求载荷 + """ + return { + "share_title": share_title, + "share_comment": share_comment, + "share_user": share_user, + "share_uid": self._share_user_id, + **subscribe_dict + } + + def _handle_response(self, res, clear_cache: bool = True) -> Tuple[bool, str]: + """ + 处理HTTP响应 + """ + if res is None: + return False, "连接MoviePilot服务器失败" + + # 检查响应状态 + if res and res.status_code == 200: + # 清除缓存 + if clear_cache: + cache_backend.clear(region=self._shares_cache_region) + return True, "" + else: + return False, res.json().get("message") + + @staticmethod + def _handle_list_response(res) -> List[dict]: + """ + 处理返回List的HTTP响应 + """ + if res and res.status_code == 200: + return res.json() + return [] + @cached(region=_shares_cache_region, maxsize=5, ttl=1800, skip_empty=True) def get_statistic(self, stype: str, page: Optional[int] = 1, count: Optional[int] = 30) -> List[dict]: """ 获取订阅统计数据 """ - if not settings.SUBSCRIBE_STATISTIC_SHARE: + enabled, _ = self._check_subscribe_share_enabled() + if not enabled: return [] + res = RequestUtils(proxies=settings.PROXY, timeout=15).get_res(self._sub_statistic, params={ "stype": stype, "page": page, "count": count }) - if res and res.status_code == 200: - return res.json() - return [] + + return self._handle_list_response(res) @cached(region=_shares_cache_region, maxsize=5, ttl=1800, skip_empty=True) async def async_get_statistic(self, stype: str, page: Optional[int] = 1, count: Optional[int] = 30) -> List[dict]: """ 异步获取订阅统计数据 """ - if not settings.SUBSCRIBE_STATISTIC_SHARE: + enabled, _ = self._check_subscribe_share_enabled() + if not enabled: return [] + res = await AsyncRequestUtils(proxies=settings.PROXY, timeout=15).get_res(self._sub_statistic, params={ "stype": stype, "page": page, "count": count }) - if res and res.status_code == 200: - return res.json() - return [] + + return self._handle_list_response(res) def sub_reg(self, sub: dict) -> bool: """ 新增订阅统计 """ - if not settings.SUBSCRIBE_STATISTIC_SHARE: + enabled, _ = self._check_subscribe_share_enabled() + if not enabled: return False res = RequestUtils(proxies=settings.PROXY, timeout=5, headers={ "Content-Type": "application/json" @@ -109,7 +177,8 @@ class SubscribeHelper(metaclass=WeakSingleton): """ 完成订阅统计 """ - if not settings.SUBSCRIBE_STATISTIC_SHARE: + enabled, _ = self._check_subscribe_share_enabled() + if not enabled: return False res = RequestUtils(proxies=settings.PROXY, timeout=5, headers={ "Content-Type": "application/json" @@ -138,7 +207,8 @@ class SubscribeHelper(metaclass=WeakSingleton): """ 上报存量订阅统计 """ - if not settings.SUBSCRIBE_STATISTIC_SHARE: + enabled, _ = self._check_subscribe_share_enabled() + if not enabled: return False subscribes = SubscribeOper().list() if not subscribes: @@ -157,146 +227,133 @@ class SubscribeHelper(metaclass=WeakSingleton): """ 分享订阅 """ - if not settings.SUBSCRIBE_STATISTIC_SHARE: - return False, "当前没有开启订阅数据共享功能" + # 检查功能是否开启 + enabled, message = self._check_subscribe_share_enabled() + if not enabled: + return False, message + + # 获取订阅信息 subscribe = SubscribeOper().get(subscribe_id) - if not subscribe: - return False, "订阅不存在" - subscribe_dict = subscribe.to_dict() - subscribe_dict.pop("id") - cache_backend.clear(region=self._shares_cache_region) + + # 验证订阅 + valid, message = self._validate_subscribe(subscribe) + if not valid: + return False, message + + # 准备数据 + subscribe_dict = self._prepare_subscribe_data(subscribe) + payload = self._build_share_payload(share_title, share_comment, share_user, subscribe_dict) + + # 发送分享请求 res = RequestUtils(proxies=settings.PROXY, content_type="application/json", - timeout=10).post(self._sub_share, - json={ - "share_title": share_title, - "share_comment": share_comment, - "share_user": share_user, - "share_uid": self._share_user_id, - **subscribe_dict - }) - if res is None: - return False, "连接MoviePilot服务器失败" - if res.ok: - # 清除 get_shares 的缓存,以便实时看到结果 - cache_backend.clear(region=self._shares_cache_region) - return True, "" - else: - return False, res.json().get("message") + timeout=10).post(self._sub_share, json=payload) + + return self._handle_response(res) async def async_sub_share(self, subscribe_id: int, share_title: str, share_comment: str, share_user: str) -> Tuple[bool, str]: """ 异步分享订阅 """ - if not settings.SUBSCRIBE_STATISTIC_SHARE: - return False, "当前没有开启订阅数据共享功能" + # 检查功能是否开启 + enabled, message = self._check_subscribe_share_enabled() + if not enabled: + return False, message + + # 获取订阅信息 subscribe = await SubscribeOper().async_get(subscribe_id) - if not subscribe: - return False, "订阅不存在" - subscribe_dict = subscribe.to_dict() - subscribe_dict.pop("id") - cache_backend.clear(region=self._shares_cache_region) + + # 验证订阅 + valid, message = self._validate_subscribe(subscribe) + if not valid: + return False, message + + # 准备数据 + subscribe_dict = self._prepare_subscribe_data(subscribe) + payload = self._build_share_payload(share_title, share_comment, share_user, subscribe_dict) + + # 发送分享请求 res = await AsyncRequestUtils(proxies=settings.PROXY, content_type="application/json", - timeout=10).post(self._sub_share, - json={ - "share_title": share_title, - "share_comment": share_comment, - "share_user": share_user, - "share_uid": self._share_user_id, - **subscribe_dict - }) - if res is None: - return False, "连接MoviePilot服务器失败" - if res.status_code == 200: - # 清除 get_shares 的缓存,以便实时看到结果 - cache_backend.clear(region=self._shares_cache_region) - return True, "" - else: - return False, res.json().get("message") + timeout=10).post(self._sub_share, json=payload) + + return self._handle_response(res) def share_delete(self, share_id: int) -> Tuple[bool, str]: """ 删除分享 """ - if not settings.SUBSCRIBE_STATISTIC_SHARE: - return False, "当前没有开启订阅数据共享功能" + # 检查功能是否开启 + enabled, message = self._check_subscribe_share_enabled() + if not enabled: + return False, message + res = RequestUtils(proxies=settings.PROXY, timeout=5).delete_res(f"{self._sub_share}/{share_id}", params={"share_uid": self._share_user_id}) - if res is None: - return False, "连接MoviePilot服务器失败" - if res.ok: - # 清除 get_shares 的缓存,以便实时看到结果 - cache_backend.clear(region=self._shares_cache_region) - return True, "" - else: - return False, res.json().get("message") + + return self._handle_response(res) async def async_share_delete(self, share_id: int) -> Tuple[bool, str]: """ 异步删除分享 """ - if not settings.SUBSCRIBE_STATISTIC_SHARE: - return False, "当前没有开启订阅数据共享功能" + # 检查功能是否开启 + enabled, message = self._check_subscribe_share_enabled() + if not enabled: + return False, message + res = await AsyncRequestUtils(proxies=settings.PROXY, timeout=5).delete_res(f"{self._sub_share}/{share_id}", params={"share_uid": self._share_user_id}) - if res is None: - return False, "连接MoviePilot服务器失败" - if res.status_code == 200: - # 清除 get_shares 的缓存,以便实时看到结果 - cache_backend.clear(region=self._shares_cache_region) - return True, "" - else: - return False, res.json().get("message") + + return self._handle_response(res) def sub_fork(self, share_id: int) -> Tuple[bool, str]: """ 复用分享的订阅 """ - if not settings.SUBSCRIBE_STATISTIC_SHARE: - return False, "当前没有开启订阅数据共享功能" + # 检查功能是否开启 + enabled, message = self._check_subscribe_share_enabled() + if not enabled: + return False, message + res = RequestUtils(proxies=settings.PROXY, timeout=5, headers={ "Content-Type": "application/json" }).get_res(self._sub_fork % share_id) - if res is None: - return False, "连接MoviePilot服务器失败" - if res.ok: - return True, "" - else: - return False, res.json().get("message") + + return self._handle_response(res, clear_cache=False) async def async_sub_fork(self, share_id: int) -> Tuple[bool, str]: """ 异步复用分享的订阅 """ - if not settings.SUBSCRIBE_STATISTIC_SHARE: - return False, "当前没有开启订阅数据共享功能" + # 检查功能是否开启 + enabled, message = self._check_subscribe_share_enabled() + if not enabled: + return False, message + res = await AsyncRequestUtils(proxies=settings.PROXY, timeout=5, headers={ "Content-Type": "application/json" }).get_res(self._sub_fork % share_id) - if res is None: - return False, "连接MoviePilot服务器失败" - if res.status_code == 200: - return True, "" - else: - return False, res.json().get("message") + + return self._handle_response(res, clear_cache=False) @cached(region=_shares_cache_region, maxsize=1, ttl=1800, skip_empty=True) def get_shares(self, name: Optional[str] = None, page: Optional[int] = 1, count: Optional[int] = 30) -> List[dict]: """ 获取订阅分享数据 """ - if not settings.SUBSCRIBE_STATISTIC_SHARE: + enabled, _ = self._check_subscribe_share_enabled() + if not enabled: return [] + res = RequestUtils(proxies=settings.PROXY, timeout=15).get_res(self._sub_shares, params={ "name": name, "page": page, "count": count }) - if res and res.status_code == 200: - return res.json() - return [] + + return self._handle_list_response(res) @cached(region=_shares_cache_region, maxsize=1, ttl=1800, skip_empty=True) async def async_get_shares(self, name: Optional[str] = None, page: Optional[int] = 1, count: Optional[int] = 30) -> \ @@ -304,40 +361,43 @@ class SubscribeHelper(metaclass=WeakSingleton): """ 异步获取订阅分享数据 """ - if not settings.SUBSCRIBE_STATISTIC_SHARE: + enabled, _ = self._check_subscribe_share_enabled() + if not enabled: return [] + res = await AsyncRequestUtils(proxies=settings.PROXY, timeout=15).get_res(self._sub_shares, params={ "name": name, "page": page, "count": count }) - if res and res.status_code == 200: - return res.json() - return [] + + return self._handle_list_response(res) @cached(region=_shares_cache_region, maxsize=1, ttl=1800, skip_empty=True) def get_share_statistics(self) -> List[dict]: """ 获取订阅分享统计数据 """ - if not settings.SUBSCRIBE_STATISTIC_SHARE: + enabled, _ = self._check_subscribe_share_enabled() + if not enabled: return [] + res = RequestUtils(proxies=settings.PROXY, timeout=15).get_res(self._sub_share_statistic) - if res and res.status_code == 200: - return res.json() - return [] + + return self._handle_list_response(res) @cached(region=_shares_cache_region, maxsize=1, ttl=1800, skip_empty=True) async def async_get_share_statistics(self) -> List[dict]: """ 异步获取订阅分享统计数据 """ - if not settings.SUBSCRIBE_STATISTIC_SHARE: + enabled, _ = self._check_subscribe_share_enabled() + if not enabled: return [] + res = await AsyncRequestUtils(proxies=settings.PROXY, timeout=15).get_res(self._sub_share_statistic) - if res and res.status_code == 200: - return res.json() - return [] + + return self._handle_list_response(res) def get_user_uuid(self) -> str: """ diff --git a/app/helper/workflow.py b/app/helper/workflow.py index 0bf5d6ef..f77471ca 100644 --- a/app/helper/workflow.py +++ b/app/helper/workflow.py @@ -3,12 +3,12 @@ from typing import List, Tuple, Optional from app.core.cache import cached, cache_backend from app.core.config import settings +from app.db.models import Workflow from app.db.workflow_oper import WorkflowOper from app.log import logger from app.utils.http import RequestUtils, AsyncRequestUtils from app.utils.singleton import WeakSingleton from app.utils.system import SystemUtils -from db.models import Workflow class WorkflowHelper(metaclass=WeakSingleton): @@ -94,6 +94,15 @@ class WorkflowHelper(metaclass=WeakSingleton): else: return False, res.json().get("message") + @staticmethod + def _handle_list_response(res) -> List[dict]: + """ + 处理返回List的HTTP响应 + """ + if res and res.status_code == 200: + return res.json() + return [] + def workflow_share(self, workflow_id: int, share_title: str, share_comment: str, share_user: str) -> Tuple[bool, str]: """ @@ -228,9 +237,7 @@ class WorkflowHelper(metaclass=WeakSingleton): "page": page, "count": count }) - if res and res.status_code == 200: - return res.json() - return [] + return self._handle_list_response(res) @cached(region=_shares_cache_region, maxsize=1, skip_empty=True) async def async_get_shares(self, name: Optional[str] = None, page: Optional[int] = 1, count: Optional[int] = 30) -> \ @@ -247,9 +254,7 @@ class WorkflowHelper(metaclass=WeakSingleton): "page": page, "count": count }) - if res and res.status_code == 200: - return res.json() - return [] + return self._handle_list_response(res) def get_user_uuid(self) -> str: """