mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-02-03 02:25:32 +08:00
227 lines
9.2 KiB
Python
227 lines
9.2 KiB
Python
import shutil
|
||
import time
|
||
from pathlib import Path
|
||
from typing import Tuple, Union
|
||
|
||
from lxml import etree
|
||
|
||
from app.chain.storage import StorageChain
|
||
from app.core.config import settings
|
||
from app.core.context import Context
|
||
from app.db.site_oper import SiteOper
|
||
from app.helper.sites import SitesHelper # noqa
|
||
from app.helper.torrent import TorrentHelper
|
||
from app.log import logger
|
||
from app.modules import _ModuleBase
|
||
from app.modules.indexer.spider.mtorrent import MTorrentSpider
|
||
from app.schemas import TorrentInfo
|
||
from app.schemas.file import FileURI
|
||
from app.schemas.types import ModuleType, OtherModulesType
|
||
from app.utils.http import RequestUtils
|
||
from app.utils.string import StringUtils
|
||
from app.utils.system import SystemUtils
|
||
|
||
|
||
class SubtitleModule(_ModuleBase):
|
||
"""
|
||
字幕下载模块
|
||
"""
|
||
|
||
# 站点详情页字幕下载链接识别XPATH
|
||
_SITE_SUBTITLE_XPATH = [
|
||
'//td[@class="rowhead"][text()="字幕"]/following-sibling::td//a[not(@class)]/@href',
|
||
'//td[@class="rowhead"][text()="字幕"]/following-sibling::td//a/@href',
|
||
'//div[contains(@class, "font-bold")][text()="字幕"]/following-sibling::div[1]//a[not(@class)]/@href', # 憨憨
|
||
]
|
||
|
||
def init_module(self) -> None:
|
||
pass
|
||
|
||
@staticmethod
|
||
def get_name() -> str:
|
||
return "站点字幕"
|
||
|
||
@staticmethod
|
||
def get_type() -> ModuleType:
|
||
"""
|
||
获取模块类型
|
||
"""
|
||
return ModuleType.Other
|
||
|
||
@staticmethod
|
||
def get_subtype() -> OtherModulesType:
|
||
"""
|
||
获取模块子类型
|
||
"""
|
||
return OtherModulesType.Subtitle
|
||
|
||
@staticmethod
|
||
def get_priority() -> int:
|
||
"""
|
||
获取模块优先级,数字越小优先级越高,只有同一接口下优先级才生效
|
||
"""
|
||
return 0
|
||
|
||
def init_setting(self) -> Tuple[str, Union[str, bool]]:
|
||
pass
|
||
|
||
def stop(self) -> None:
|
||
pass
|
||
|
||
def test(self):
|
||
pass
|
||
|
||
def _get_subtitle_links(self, torrent: TorrentInfo):
|
||
"""
|
||
获取字幕链接
|
||
"""
|
||
# API请求方式的站点需要特殊处理
|
||
if torrent.site is not None:
|
||
site = SiteOper().get(torrent.site)
|
||
if indexer := SitesHelper().get_indexer(site.domain):
|
||
if indexer.get("parser") == "mTorrent":
|
||
return MTorrentSpider(indexer).get_subtitle_links(
|
||
torrent.page_url
|
||
)
|
||
# TODO 其它采用API访问的站点
|
||
# 普通站点通过解析网站代码的方式获取
|
||
request = RequestUtils(cookies=torrent.site_cookie, ua=torrent.site_ua)
|
||
res = request.get_res(torrent.page_url)
|
||
if res and res.status_code == 200:
|
||
if not res.text:
|
||
logger.warn(f"读取页面代码失败:{torrent.page_url}")
|
||
return []
|
||
html = etree.HTML(res.text)
|
||
try:
|
||
sublink_list = []
|
||
for xpath in self._SITE_SUBTITLE_XPATH:
|
||
sublinks = html.xpath(xpath)
|
||
if sublinks:
|
||
for sublink in sublinks:
|
||
if not sublink:
|
||
continue
|
||
if not sublink.startswith("http"):
|
||
base_url = StringUtils.get_base_url(torrent.page_url)
|
||
if sublink.startswith("/"):
|
||
sublink = "%s%s" % (base_url, sublink)
|
||
else:
|
||
sublink = "%s/%s" % (base_url, sublink)
|
||
sublink_list.append(sublink)
|
||
# 已成功获取了链接,后续xpath可以忽略
|
||
break
|
||
return sublink_list
|
||
finally:
|
||
if html is not None:
|
||
del html
|
||
elif res is not None:
|
||
logger.warn(f"连接 {torrent.page_url} 失败,状态码:{res.status_code}")
|
||
else:
|
||
logger.warn(f"无法打开链接:{torrent.page_url}")
|
||
return None
|
||
|
||
def download_added(self, context: Context, download_dir: Path, torrent_content: Union[str, bytes] = None):
|
||
"""
|
||
添加下载任务成功后,从站点下载字幕,保存到下载目录
|
||
:param context: 上下文,包括识别信息、媒体信息、种子信息
|
||
:param download_dir: 下载目录
|
||
:param torrent_content: 种子内容,如果是种子文件,则为文件内容,否则为种子字符串
|
||
:return: None,该方法可被多个模块同时处理
|
||
"""
|
||
if not settings.DOWNLOAD_SUBTITLE:
|
||
return
|
||
|
||
# 没有种子文件不处理
|
||
if not torrent_content:
|
||
return
|
||
|
||
# 没有详情页不处理
|
||
torrent = context.torrent_info
|
||
if not torrent.page_url:
|
||
return
|
||
# 字幕下载目录
|
||
logger.info("开始从站点下载字幕:%s" % torrent.page_url)
|
||
# 获取种子信息
|
||
folder_name, _ = TorrentHelper().get_fileinfo_from_torrent_content(torrent_content)
|
||
# 文件保存目录,如果是单文件种子,则folder_name是空,此时文件保存目录就是下载目录
|
||
storageChain = StorageChain()
|
||
# 等待目录存在
|
||
working_dir_item = None
|
||
# split download_dir into storage and path
|
||
fileURI = FileURI.from_uri(download_dir.as_posix())
|
||
storage = fileURI.storage
|
||
download_dir = Path(fileURI.path)
|
||
for _ in range(30):
|
||
found = storageChain.get_file_item(storage, download_dir / folder_name)
|
||
if found:
|
||
working_dir_item = found
|
||
break
|
||
time.sleep(1)
|
||
# 目录仍然不存在,且有文件夹名,则创建目录
|
||
if not working_dir_item and folder_name:
|
||
parent_dir_item = storageChain.get_file_item(storage, download_dir)
|
||
if parent_dir_item:
|
||
working_dir_item = storageChain.create_folder(
|
||
parent_dir_item,
|
||
folder_name
|
||
)
|
||
else:
|
||
logger.error(f"下载根目录不存在,无法创建字幕文件夹:{download_dir}")
|
||
return
|
||
if not working_dir_item:
|
||
logger.error(f"下载目录不存在,无法保存字幕:{download_dir / folder_name}")
|
||
return
|
||
# 读取网站代码
|
||
sublink_list = self._get_subtitle_links(torrent)
|
||
if not sublink_list:
|
||
logger.warn(f"{torrent.page_url} 页面未找到字幕下载链接")
|
||
return
|
||
# 下载所有字幕文件
|
||
request = RequestUtils(cookies=torrent.site_cookie, ua=torrent.site_ua)
|
||
for sublink in sublink_list:
|
||
logger.info(f"找到字幕下载链接:{sublink},开始下载...")
|
||
# 下载
|
||
ret = request.get_res(sublink)
|
||
if ret and ret.status_code == 200:
|
||
# 保存ZIP
|
||
file_name = TorrentHelper.get_url_filename(ret, sublink)
|
||
if not file_name:
|
||
logger.warn(f"链接不是字幕文件:{sublink}")
|
||
continue
|
||
if file_name.lower().endswith(".zip"):
|
||
# ZIP包
|
||
zip_file = settings.TEMP_PATH / file_name
|
||
# 保存
|
||
zip_file.write_bytes(ret.content)
|
||
# 解压路径
|
||
zip_path = zip_file.with_name(zip_file.stem)
|
||
# 解压文件
|
||
shutil.unpack_archive(zip_file, zip_path, format='zip')
|
||
# 遍历转移文件
|
||
for sub_file in SystemUtils.list_files(zip_path, settings.RMT_SUBEXT):
|
||
target_sub_file = Path(working_dir_item.path) / Path(sub_file.name)
|
||
if storageChain.get_file_item(storage, target_sub_file):
|
||
logger.info(f"字幕文件已存在:{target_sub_file}")
|
||
continue
|
||
logger.info(f"转移字幕 {sub_file} 到 {target_sub_file} ...")
|
||
storageChain.upload_file(working_dir_item, sub_file)
|
||
# 删除临时文件
|
||
try:
|
||
shutil.rmtree(zip_path)
|
||
zip_file.unlink()
|
||
except Exception as err:
|
||
logger.error(f"删除临时文件失败:{str(err)}")
|
||
else:
|
||
sub_file = settings.TEMP_PATH / file_name
|
||
# 保存
|
||
sub_file.write_bytes(ret.content)
|
||
target_sub_file = Path(working_dir_item.path) / Path(sub_file.name)
|
||
if storageChain.get_file_item(storage, target_sub_file):
|
||
logger.info(f"字幕文件已存在:{target_sub_file}")
|
||
continue
|
||
logger.info(f"转移字幕 {sub_file} 到 {target_sub_file} ...")
|
||
storageChain.upload_file(working_dir_item, sub_file)
|
||
else:
|
||
logger.error(f"下载字幕文件失败:{sublink}")
|
||
continue
|
||
logger.info(f"{torrent.page_url} 页面字幕下载完成")
|