import shutil import time from pathlib import Path from typing import Tuple, Union from lxml import etree from app.chain.storage import StorageChain from app.core.config import settings from app.core.context import Context from app.db.site_oper import SiteOper from app.helper.sites import SitesHelper # noqa from app.helper.torrent import TorrentHelper from app.log import logger from app.modules import _ModuleBase from app.modules.indexer.spider.mtorrent import MTorrentSpider from app.schemas import TorrentInfo from app.schemas.file import FileURI from app.schemas.types import ModuleType, OtherModulesType from app.utils.http import RequestUtils from app.utils.string import StringUtils from app.utils.system import SystemUtils class SubtitleModule(_ModuleBase): """ 字幕下载模块 """ # 站点详情页字幕下载链接识别XPATH _SITE_SUBTITLE_XPATH = [ '//td[@class="rowhead"][text()="字幕"]/following-sibling::td//a[not(@class)]/@href', '//td[@class="rowhead"][text()="字幕"]/following-sibling::td//a/@href', '//div[contains(@class, "font-bold")][text()="字幕"]/following-sibling::div[1]//a[not(@class)]/@href', # 憨憨 ] def init_module(self) -> None: pass @staticmethod def get_name() -> str: return "站点字幕" @staticmethod def get_type() -> ModuleType: """ 获取模块类型 """ return ModuleType.Other @staticmethod def get_subtype() -> OtherModulesType: """ 获取模块子类型 """ return OtherModulesType.Subtitle @staticmethod def get_priority() -> int: """ 获取模块优先级,数字越小优先级越高,只有同一接口下优先级才生效 """ return 0 def init_setting(self) -> Tuple[str, Union[str, bool]]: pass def stop(self) -> None: pass def test(self): pass def _get_subtitle_links(self, torrent: TorrentInfo): """ 获取字幕链接 """ # API请求方式的站点需要特殊处理 if torrent.site is not None: site = SiteOper().get(torrent.site) if indexer := SitesHelper().get_indexer(site.domain): if indexer.get("parser") == "mTorrent": return MTorrentSpider(indexer).get_subtitle_links( torrent.page_url ) # TODO 其它采用API访问的站点 # 普通站点通过解析网站代码的方式获取 request = RequestUtils( cookies=torrent.site_cookie, ua=torrent.site_ua, proxies=settings.PROXY if torrent.site_proxy else None, ) res = request.get_res(torrent.page_url) if res and res.status_code == 200: if not res.text: logger.warn(f"读取页面代码失败:{torrent.page_url}") return [] html = etree.HTML(res.text) try: sublink_list = [] for xpath in self._SITE_SUBTITLE_XPATH: sublinks = html.xpath(xpath) if sublinks: for sublink in sublinks: if not sublink: continue if not sublink.startswith("http"): base_url = StringUtils.get_base_url(torrent.page_url) if sublink.startswith("/"): sublink = "%s%s" % (base_url, sublink) else: sublink = "%s/%s" % (base_url, sublink) sublink_list.append(sublink) # 已成功获取了链接,后续xpath可以忽略 break return sublink_list finally: if html is not None: del html elif res is not None: logger.warn(f"连接 {torrent.page_url} 失败,状态码:{res.status_code}") else: logger.warn(f"无法打开链接:{torrent.page_url}") return None def download_added(self, context: Context, download_dir: Path, torrent_content: Union[str, bytes] = None): """ 添加下载任务成功后,从站点下载字幕,保存到下载目录 :param context: 上下文,包括识别信息、媒体信息、种子信息 :param download_dir: 下载目录 :param torrent_content: 种子内容,如果是种子文件,则为文件内容,否则为种子字符串 :return: None,该方法可被多个模块同时处理 """ if not settings.DOWNLOAD_SUBTITLE: return # 没有种子文件不处理 if not torrent_content: return # 没有详情页不处理 torrent = context.torrent_info if not torrent.page_url: return # 字幕下载目录 logger.info("开始从站点下载字幕:%s" % torrent.page_url) # 获取种子信息 folder_name, _ = TorrentHelper.get_fileinfo_from_torrent_content(torrent_content) # 文件保存目录,如果是单文件种子,则folder_name是空,此时文件保存目录就是下载目录 storageChain = StorageChain() # 等待目录存在 working_dir_item = None # split download_dir into storage and path fileURI = FileURI.from_uri(download_dir.as_posix()) storage = fileURI.storage download_dir = Path(fileURI.path) for _ in range(30): found = storageChain.get_file_item(storage, download_dir / folder_name) if found: working_dir_item = found break time.sleep(1) # 目录仍然不存在,且有文件夹名,则创建目录 if not working_dir_item and folder_name: parent_dir_item = storageChain.get_file_item(storage, download_dir) if parent_dir_item: working_dir_item = storageChain.create_folder( parent_dir_item, folder_name ) else: logger.error(f"下载根目录不存在,无法创建字幕文件夹:{download_dir}") return if not working_dir_item: logger.error(f"下载目录不存在,无法保存字幕:{download_dir / folder_name}") return # 读取网站代码 sublink_list = self._get_subtitle_links(torrent) if not sublink_list: logger.warn(f"{torrent.page_url} 页面未找到字幕下载链接") return # 下载所有字幕文件 request = RequestUtils( cookies=torrent.site_cookie, ua=torrent.site_ua, proxies=settings.PROXY if torrent.site_proxy else None, ) for sublink in sublink_list: logger.info(f"找到字幕下载链接:{sublink},开始下载...") # 下载 ret = request.get_res(sublink) if ret and ret.status_code == 200: # 保存ZIP file_name = TorrentHelper.get_url_filename(ret, sublink) if not file_name: logger.warn(f"链接不是字幕文件:{sublink}") continue if file_name.lower().endswith(".zip"): # ZIP包 zip_file = settings.TEMP_PATH / file_name # 保存 zip_file.write_bytes(ret.content) # 解压路径 zip_path = zip_file.with_name(zip_file.stem) # 解压文件 shutil.unpack_archive(zip_file, zip_path, format='zip') # 遍历转移文件 for sub_file in SystemUtils.list_files(zip_path, settings.RMT_SUBEXT): target_sub_file = Path(working_dir_item.path) / Path(sub_file.name) if storageChain.get_file_item(storage, target_sub_file): logger.info(f"字幕文件已存在:{target_sub_file}") continue logger.info(f"转移字幕 {sub_file} 到 {target_sub_file} ...") storageChain.upload_file(working_dir_item, sub_file) # 删除临时文件 try: shutil.rmtree(zip_path) zip_file.unlink() except Exception as err: logger.error(f"删除临时文件失败:{str(err)}") else: sub_file = settings.TEMP_PATH / file_name # 保存 sub_file.write_bytes(ret.content) target_sub_file = Path(working_dir_item.path) / Path(sub_file.name) if storageChain.get_file_item(storage, target_sub_file): logger.info(f"字幕文件已存在:{target_sub_file}") continue logger.info(f"转移字幕 {sub_file} 到 {target_sub_file} ...") storageChain.upload_file(working_dir_item, sub_file) else: logger.error(f"下载字幕文件失败:{sublink}") continue logger.info(f"{torrent.page_url} 页面字幕下载完成")