diff --git a/app/modules/indexer/spider/mtorrent.py b/app/modules/indexer/spider/mtorrent.py index b694df56..54e292c2 100644 --- a/app/modules/indexer/spider/mtorrent.py +++ b/app/modules/indexer/spider/mtorrent.py @@ -2,6 +2,7 @@ import base64 import json import re from typing import Tuple, List, Optional +from urllib.parse import urlparse from app.core.config import settings from app.db.systemconfig_oper import SystemConfigOper @@ -25,6 +26,9 @@ class MTorrentSpider: _size = 100 _searchurl = "https://api.%s/api/torrent/search" _downloadurl = "https://api.%s/api/torrent/genDlToken" + _subtitle_list_url = "https://api.%s/api/subtitle/list" + _subtitle_genlink_url = "https://api.%s/api/subtitle/genlink" + _subtitle_download_url ="https://api.%s/api/subtitle/dlV2?credential=%s" _pageurl = "%sdetail/%s" _timeout = 15 @@ -262,3 +266,110 @@ class MTorrentSpider: # base64编码 base64_str = base64.b64encode(json.dumps(params).encode('utf-8')).decode('utf-8') return f"[{base64_str}]{url}" + + def parse_subtitle_links(self, page_url: str) -> List[str]: + """ + 解析指定页面的字幕下载链接 + + :param page_url: 种子详情页网址 + :type page_url: str + :return: 字幕下载链接 + :rtype: List[str] + """ + if not page_url: + return [] + # 从馒头的详情页网址中提取种子id + torrent_id = urlparse(page_url).path.rsplit("/", 1)[-1].strip() + if not torrent_id: + return [] + return self.get_subtitle_links_by_id(torrent_id) + + def get_subtitle_links_by_id(self, torrent_id: str) -> List[str]: + """ + 获取指定种子的字幕下载链接 + + :param torrent_id: 种子ID + :type torrent_id: str + :return: 字幕下载链接 + :rtype: List[str] + """ + results = [] + try: + for subtitle_id in self.__subtitle_ids(torrent_id) or []: + if link := self.__subtitle_genlink(subtitle_id): + results.append(link) + except Exception as e: + logger.error(f"{self._name} 获取字幕失败:{e}") + return results + + def __subtitle_ids(self, torrent_id: str) -> Optional[List[str]]: + """ + 获取指定种子的字幕列表 + + :param torrent_id: 种子ID + :type torrent_id: str + :return: 字幕ID + :rtype: List[str] | None + """ + url = self._subtitle_list_url % self._domain + # 发送请求 + res = RequestUtils( + headers={ + "Accept": "application/json, text/plain, */*", + "User-Agent": f"{self._ua}", + "x-api-key": self._apikey, + }, + proxies=self._proxy, + timeout=self._timeout, + ).post_res(url, data={"id": torrent_id}) + if res and res.status_code == 200: + result = res.json() + if int(result.get("code", -1)) == 0: + return [item["id"] for item in result.get("data", []) if "id" in item] + else: + logger.warn( + f"{self._name} 获取字幕列表失败,返回:{result.get("message", "未知")}" + ) + return None + elif res is not None: + logger.warn(f"{self._name} 获取字幕列表失败,错误码:{res.status_code}") + return None + else: + logger.warn(f"{self._name} 获取字幕列表失败,无法连接 {self._domain}") + return None + + def __subtitle_genlink(self, subtitle_id: str) -> Optional[str]: + """ + 获取字幕下载链接 + + :param subtitle_id: 字幕ID + :type subtitle_id: str + :return: 下载链接 + :rtype: str | None + """ + url = self._subtitle_genlink_url % self._domain + # 发送请求 + res = RequestUtils( + headers={ + "Accept": "application/json, text/plain, */*", + "User-Agent": f"{self._ua}", + "x-api-key": self._apikey, + }, + proxies=self._proxy, + timeout=self._timeout, + ).post_res(url, data={"id": subtitle_id}) + if res and res.status_code == 200: + result = res.json() + if int(result.get("code", -1)) == 0 and isinstance(result.get("data"), str): + return self._subtitle_download_url % (self._domain, result["data"]) + else: + logger.warn( + f"{self._name} 获取字幕下载链接失败,返回:{result.get("message", "未知")}" + ) + return None + elif res is not None: + logger.warn(f"{self._name} 获取字幕下载链接失败,错误码:{res.status_code}") + return None + else: + logger.warn(f"{self._name} 获取字幕下载链接失败,无法连接 {self._domain}") + return None