mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-03-20 03:57:30 +08:00
548 lines
24 KiB
Python
548 lines
24 KiB
Python
import datetime
|
||
import re
|
||
from pathlib import Path
|
||
from typing import Tuple, Optional, List, Union, Dict, Any
|
||
from urllib.parse import unquote
|
||
|
||
from torrentool.api import Torrent
|
||
|
||
from app.core.cache import TTLCache, FileCache
|
||
from app.core.config import settings
|
||
from app.core.context import Context, TorrentInfo, MediaInfo
|
||
from app.core.meta import MetaBase
|
||
from app.core.metainfo import MetaInfo
|
||
from app.db.site_oper import SiteOper
|
||
from app.db.systemconfig_oper import SystemConfigOper
|
||
from app.log import logger
|
||
from app.schemas.types import MediaType, SystemConfigKey
|
||
from app.utils.http import RequestUtils
|
||
from app.utils.string import StringUtils
|
||
|
||
|
||
class TorrentHelper:
|
||
"""
|
||
种子帮助类
|
||
"""
|
||
|
||
def __init__(self):
|
||
self._invalid_torrents = TTLCache(region="invalid_torrents", maxsize=128, ttl=3600 * 24)
|
||
|
||
def download_torrent(self, url: str,
|
||
cookie: Optional[str] = None,
|
||
ua: Optional[str] = None,
|
||
referer: Optional[str] = None,
|
||
proxy: Optional[bool] = False) \
|
||
-> Tuple[Optional[Path], Optional[Union[str, bytes]], Optional[str], Optional[list], Optional[str]]:
|
||
"""
|
||
把种子下载到本地
|
||
:return: 种子缓存相对路径【用于索引缓存】, 种子内容、种子主目录、种子文件清单、错误信息
|
||
"""
|
||
if url.startswith("magnet:"):
|
||
return None, url, "", [], f"磁力链接"
|
||
# 构建 torrent 种子文件的缓存路径
|
||
cache_path = Path(StringUtils.md5_hash(url)).with_suffix(".torrent")
|
||
# 缓存处理器
|
||
cache_backend = FileCache()
|
||
# 读取缓存的种子文件
|
||
torrent_content = cache_backend.get(cache_path.as_posix(), region="torrents")
|
||
if torrent_content:
|
||
# 缓存已存在
|
||
try:
|
||
# 获取种子目录和文件清单
|
||
folder_name, file_list = self.get_fileinfo_from_torrent_content(torrent_content)
|
||
# 无法获取信息,则认为缓存文件无效
|
||
if not folder_name and not file_list:
|
||
raise ValueError("无效的缓存种子文件")
|
||
# 成功拿到种子数据
|
||
return cache_path, torrent_content, folder_name, file_list, ""
|
||
except Exception as err:
|
||
logger.error(f"处理缓存的种子文件 {cache_path} 时出错: {err},将重新下载")
|
||
# 下载种子文件
|
||
req = RequestUtils(
|
||
ua=ua,
|
||
cookies=cookie,
|
||
referer=referer,
|
||
proxies=settings.PROXY if proxy else None
|
||
).get_res(url=url, allow_redirects=False)
|
||
while req and req.status_code in [301, 302]:
|
||
url = req.headers['Location']
|
||
if url and url.startswith("magnet:"):
|
||
return None, url, "", [], f"获取到磁力链接"
|
||
req = RequestUtils(
|
||
ua=ua,
|
||
cookies=cookie,
|
||
referer=referer,
|
||
proxies=settings.PROXY if proxy else None
|
||
).get_res(url=url, allow_redirects=False)
|
||
if req and req.status_code == 200:
|
||
if not req.content:
|
||
return cache_path, None, "", [], "未下载到种子数据"
|
||
# 解析内容格式
|
||
if req.content.startswith(b"magnet:"):
|
||
# 磁力链接
|
||
return cache_path, req.text, "", [], f"获取到磁力链接"
|
||
if "下载种子文件".encode("utf-8") in req.content:
|
||
# 首次下载提示页面
|
||
skip_flag = False
|
||
try:
|
||
forms = re.findall(r'<form.*?action="(.*?)".*?>(.*?)</form>', req.text, re.S)
|
||
for form in forms:
|
||
action = form[0]
|
||
if action != "?":
|
||
continue
|
||
action = url
|
||
inputs = re.findall(r'<input.*?name="(.*?)".*?value="(.*?)".*?>', form[1], re.S)
|
||
if inputs:
|
||
data = {}
|
||
for item in inputs:
|
||
data[item[0]] = item[1]
|
||
# 改写req
|
||
req = RequestUtils(
|
||
ua=ua,
|
||
cookies=cookie,
|
||
referer=referer,
|
||
proxies=settings.PROXY if proxy else None
|
||
).post_res(url=action, data=data)
|
||
if req and req.status_code == 200:
|
||
# 检查是不是种子文件,如果不是抛出异常
|
||
Torrent.from_string(req.content)
|
||
# 跳过成功
|
||
logger.info(f"触发了站点首次种子下载,已自动跳过:{url}")
|
||
skip_flag = True
|
||
elif req is not None:
|
||
logger.warn(f"触发了站点首次种子下载,且无法自动跳过,"
|
||
f"返回码:{req.status_code},错误原因:{req.reason}")
|
||
else:
|
||
logger.warn(f"触发了站点首次种子下载,且无法自动跳过:{url}")
|
||
break
|
||
except Exception as err:
|
||
logger.warn(f"触发了站点首次种子下载,尝试自动跳过时出现错误:{str(err)},链接:{url}")
|
||
if not skip_flag:
|
||
return cache_path, None, "", [], "种子数据有误,请确认链接是否正确,如为PT站点则需手工在站点下载一次种子"
|
||
# 种子内容
|
||
if req.content:
|
||
# 检查是不是种子文件,如果不是仍然抛出异常
|
||
try:
|
||
# 获取种子目录和文件清单
|
||
folder_name, file_list = self.get_fileinfo_from_torrent_content(req.content)
|
||
if file_list:
|
||
# 保存到缓存
|
||
cache_backend.set(cache_path.as_posix(), req.content, region="torrents")
|
||
# 成功拿到种子数据
|
||
return cache_path, req.content, folder_name, file_list, ""
|
||
except Exception as err:
|
||
logger.error(f"种子文件解析失败:{str(err)}")
|
||
# 种子数据仍然错误
|
||
return cache_path, None, "", [], "种子数据有误,请确认链接是否正确"
|
||
# 返回失败
|
||
return cache_path, None, "", [], ""
|
||
elif req is None:
|
||
return cache_path, None, "", [], "无法打开链接"
|
||
elif req.status_code == 429:
|
||
return cache_path, None, "", [], "触发站点流控,请稍后重试"
|
||
else:
|
||
# 把错误的种子记下来,避免重复使用
|
||
self.add_invalid(url)
|
||
return cache_path, None, "", [], f"下载种子出错,状态码:{req.status_code}"
|
||
|
||
def get_torrent_info(self, torrent_path: Path) -> Tuple[str, List[str]]:
|
||
"""
|
||
获取种子文件的文件夹名和文件清单
|
||
:param torrent_path: 种子文件路径
|
||
:return: 文件夹名、文件清单,单文件种子返回空文件夹名
|
||
"""
|
||
if not torrent_path or not torrent_path.exists():
|
||
return "", []
|
||
try:
|
||
torrentinfo = Torrent.from_file(torrent_path)
|
||
# 获取文件清单
|
||
return self.get_fileinfo_from_torrent(torrentinfo)
|
||
except Exception as err:
|
||
logger.error(f"种子文件解析失败:{str(err)}")
|
||
return "", []
|
||
|
||
@staticmethod
|
||
def get_fileinfo_from_torrent(torrent: Torrent) -> Tuple[str, List[str]]:
|
||
"""
|
||
从种子文件中获取文件清单
|
||
:param torrent: 种子文件对象
|
||
:return: 文件夹名、文件清单,单文件种子返回空文件夹名
|
||
"""
|
||
if not torrent or not torrent.files:
|
||
return "", []
|
||
# 获取文件清单
|
||
if len(torrent.files) == 1 and torrent.files[0].name == torrent.name:
|
||
# 单文件种子目录名返回空
|
||
folder_name = ""
|
||
# 单文件种子
|
||
file_list = [torrent.name]
|
||
else:
|
||
# 目录名
|
||
folder_name = torrent.name
|
||
# 文件清单,如果一级目录与种子名相同则去掉
|
||
file_list = []
|
||
for fileinfo in torrent.files:
|
||
file_path = Path(fileinfo.name)
|
||
# 根路径
|
||
root_path = file_path.parts[0]
|
||
if root_path == folder_name:
|
||
file_list.append(str(file_path.relative_to(root_path)))
|
||
else:
|
||
file_list.append(fileinfo.name)
|
||
logger.debug(f"解析种子:{torrent.name} => 目录:{folder_name},文件清单:{file_list}")
|
||
return folder_name, file_list
|
||
|
||
def get_fileinfo_from_torrent_content(self, torrent_content: Union[str, bytes]) -> Tuple[str, List[str]]:
|
||
"""
|
||
从种子内容中获取文件夹名和文件清单
|
||
:param torrent_content: 种子内容
|
||
:return: 文件夹名、文件清单,单文件种子返回空文件夹名
|
||
"""
|
||
|
||
if not torrent_content:
|
||
return "", []
|
||
|
||
# 检查是否为磁力链接
|
||
if StringUtils.is_magnet_link(torrent_content):
|
||
return "", []
|
||
|
||
try:
|
||
# 解析种子内容
|
||
torrentinfo = Torrent.from_string(torrent_content)
|
||
# 获取文件清单
|
||
return self.get_fileinfo_from_torrent(torrentinfo)
|
||
except Exception as err:
|
||
logger.error(f"种子内容解析失败:{str(err)}")
|
||
return "", []
|
||
|
||
@staticmethod
|
||
def get_url_filename(req: Any, url: str) -> str:
|
||
"""
|
||
从下载请求中获取种子文件名
|
||
"""
|
||
if not req:
|
||
return ""
|
||
disposition = req.headers.get('content-disposition') or ""
|
||
file_name = re.findall(r"filename=\"?(.+)\"?", disposition)
|
||
if file_name:
|
||
file_name = unquote(str(file_name[0].encode('ISO-8859-1').decode()).split(";")[0].strip())
|
||
if file_name.endswith('"'):
|
||
file_name = file_name[:-1]
|
||
elif url and url.endswith(".torrent"):
|
||
file_name = unquote(url.split("/")[-1])
|
||
else:
|
||
file_name = str(datetime.datetime.now())
|
||
return file_name
|
||
|
||
@staticmethod
|
||
def sort_torrents(torrent_list: List[Context]) -> List[Context]:
|
||
"""
|
||
对种子对行排序:torrent、site、upload、seeder
|
||
"""
|
||
if not torrent_list:
|
||
return []
|
||
|
||
# 下载规则
|
||
priority_rule: List[str] = SystemConfigOper().get(
|
||
SystemConfigKey.TorrentsPriority) or ["torrent", "upload", "seeder"]
|
||
# 站点上传量
|
||
site_uploads = {
|
||
site.name: site.upload for site in SiteOper().get_userdata_latest()
|
||
}
|
||
|
||
def get_sort_str(_context):
|
||
"""
|
||
拼装排序字段
|
||
"""
|
||
_meta = _context.meta_info
|
||
_torrent = _context.torrent_info
|
||
_media = _context.media_info
|
||
# 标题
|
||
_title = str(_media.title).ljust(200, ' ')
|
||
# 站点优先级
|
||
_site_order = str(999 - (_torrent.site_order or 0)).rjust(3, '0')
|
||
# 站点上传量
|
||
_site_upload = str(site_uploads.get(_torrent.site_name) or 0).rjust(30, '0')
|
||
# 资源优先级
|
||
_torrent_order = str(_torrent.pri_order or 0).rjust(3, '0')
|
||
# 资源做种数
|
||
_torrent_seeders = str(_torrent.seeders or 0).rjust(10, '0')
|
||
# 季集
|
||
if not _meta.episode_list:
|
||
# 无集数的排最前面
|
||
_season_episode = "%s%s" % (str(len(_meta.season_list)).rjust(3, '0'), "9999")
|
||
else:
|
||
# 集数越多的排越前面
|
||
_season_episode = "%s%s" % (str(len(_meta.season_list)).rjust(3, '0'),
|
||
str(len(_meta.episode_list)).rjust(4, '0'))
|
||
# 根据下载规则的顺序拼装排序字符串
|
||
_sort_str = _title
|
||
for rule in priority_rule:
|
||
if rule == "torrent":
|
||
_sort_str += _torrent_order
|
||
elif rule == "site":
|
||
_sort_str += _site_order
|
||
elif rule == "upload":
|
||
_sort_str += _site_upload
|
||
elif rule == "seeder":
|
||
_sort_str += _torrent_seeders
|
||
_sort_str += _season_episode
|
||
return _sort_str
|
||
|
||
# 排序
|
||
return sorted(torrent_list, key=lambda x: get_sort_str(x), reverse=True)
|
||
|
||
def sort_group_torrents(self, torrent_list: List[Context]) -> List[Context]:
|
||
"""
|
||
对媒体信息进行排序、去重
|
||
"""
|
||
if not torrent_list:
|
||
return []
|
||
|
||
# 排序
|
||
torrent_list = self.sort_torrents(torrent_list)
|
||
|
||
# 控重
|
||
result = []
|
||
_added = []
|
||
# 排序后重新加入数组,按真实名称控重,即只取每个名称的第一个
|
||
for context in torrent_list:
|
||
# 控重的主链是名称、年份、季、集
|
||
meta = context.meta_info
|
||
media = context.media_info
|
||
if media.type == MediaType.TV:
|
||
media_name = "%s%s" % (media.title_year,
|
||
meta.season_episode)
|
||
else:
|
||
media_name = media.title_year
|
||
if media_name not in _added:
|
||
_added.append(media_name)
|
||
result.append(context)
|
||
|
||
return result
|
||
|
||
@staticmethod
|
||
def get_torrent_episodes(files: list) -> list:
|
||
"""
|
||
从种子的文件清单中获取所有集数
|
||
"""
|
||
episodes = []
|
||
for file in files:
|
||
if not file:
|
||
continue
|
||
file_path = Path(file)
|
||
if not file_path.suffix or file_path.suffix.lower() not in settings.RMT_MEDIAEXT:
|
||
continue
|
||
# 只使用文件名识别
|
||
meta = MetaInfo(file_path.name)
|
||
if not meta.begin_episode:
|
||
continue
|
||
episodes = list(set(episodes).union(set(meta.episode_list)))
|
||
return episodes
|
||
|
||
def is_invalid(self, url: Optional[str]) -> bool:
|
||
"""
|
||
判断种子是否是无效种子
|
||
"""
|
||
return url in self._invalid_torrents if url else True
|
||
|
||
def add_invalid(self, url: str):
|
||
"""
|
||
添加无效种子
|
||
"""
|
||
if url not in self._invalid_torrents:
|
||
self._invalid_torrents[url] = True
|
||
|
||
@staticmethod
|
||
def match_torrent(mediainfo: MediaInfo, torrent_meta: MetaBase, torrent: TorrentInfo) -> bool:
|
||
"""
|
||
检查种子是否匹配媒体信息
|
||
:param mediainfo: 需要匹配的媒体信息
|
||
:param torrent_meta: 种子识别信息
|
||
:param torrent: 种子信息
|
||
"""
|
||
# 比对词条指定的tmdbid
|
||
if torrent_meta.tmdbid or torrent_meta.doubanid:
|
||
if torrent_meta.tmdbid and torrent_meta.tmdbid == mediainfo.tmdb_id:
|
||
logger.info(
|
||
f'{mediainfo.title} 通过词表指定TMDBID匹配到资源:{torrent.site_name} - {torrent.title}')
|
||
return True
|
||
if torrent_meta.doubanid and torrent_meta.doubanid == mediainfo.douban_id:
|
||
logger.info(
|
||
f'{mediainfo.title} 通过词表指定豆瓣ID匹配到资源:{torrent.site_name} - {torrent.title}')
|
||
return True
|
||
# 要匹配的媒体标题、原标题
|
||
media_titles = {
|
||
StringUtils.clear_upper(mediainfo.title),
|
||
StringUtils.clear_upper(mediainfo.original_title)
|
||
} - {""}
|
||
# 要匹配的媒体别名、译名
|
||
media_names = {StringUtils.clear_upper(name) for name in mediainfo.names if name}
|
||
# 识别的种子中英文名
|
||
meta_names = {
|
||
StringUtils.clear_upper(torrent_meta.cn_name),
|
||
StringUtils.clear_upper(torrent_meta.en_name)
|
||
} - {""}
|
||
# 比对种子识别类型
|
||
if torrent_meta.type == MediaType.TV and mediainfo.type != MediaType.TV:
|
||
logger.debug(f'{torrent.site_name} - {torrent.title} 种子标题类型为 {torrent_meta.type.value},'
|
||
f'不匹配 {mediainfo.type.value}')
|
||
return False
|
||
# 比对种子在站点中的类型
|
||
if torrent.category == MediaType.TV.value and mediainfo.type != MediaType.TV:
|
||
logger.debug(f'{torrent.site_name} - {torrent.title} 种子在站点中归类为 {torrent.category},'
|
||
f'不匹配 {mediainfo.type.value}')
|
||
return False
|
||
# 比对年份
|
||
if mediainfo.year:
|
||
if mediainfo.type == MediaType.TV:
|
||
# 剧集年份,每季的年份可能不同,没年份时不比较年份(很多剧集种子不带年份)
|
||
if torrent_meta.year and torrent_meta.year not in [year for year in
|
||
mediainfo.season_years.values()]:
|
||
logger.debug(f'{torrent.site_name} - {torrent.title} 年份不匹配 {mediainfo.season_years}')
|
||
return False
|
||
else:
|
||
# 电影年份,上下浮动1年,没年份时不通过
|
||
if not torrent_meta.year or torrent_meta.year not in [str(int(mediainfo.year) - 1),
|
||
mediainfo.year,
|
||
str(int(mediainfo.year) + 1)]:
|
||
logger.debug(f'{torrent.site_name} - {torrent.title} 年份不匹配 {mediainfo.year}')
|
||
return False
|
||
# 比对标题和原语种标题
|
||
if meta_names.intersection(media_titles):
|
||
logger.info(f'{mediainfo.title} 通过标题匹配到资源:{torrent.site_name} - {torrent.title}')
|
||
return True
|
||
# 比对别名和译名
|
||
if media_names:
|
||
if meta_names.intersection(media_names):
|
||
logger.info(f'{mediainfo.title} 通过别名或译名匹配到资源:{torrent.site_name} - {torrent.title}')
|
||
return True
|
||
# 标题拆分
|
||
if torrent_meta.org_string:
|
||
# 只拆分出标题中的非英文单词进行匹配,英文单词容易误匹配(带空格的多个单词组合除外)
|
||
titles = [StringUtils.clear_upper(t) for t in re.split(
|
||
r'[\s/【】.\[\]\-]+',
|
||
torrent_meta.org_string
|
||
) if not StringUtils.is_english_word(t)]
|
||
# 在标题中判断是否存在标题、原语种标题
|
||
if media_titles.intersection(titles):
|
||
logger.info(f'{mediainfo.title} 通过标题匹配到资源:{torrent.site_name} - {torrent.title}')
|
||
return True
|
||
# 在副标题中(非英文单词)判断是否存在标题、原语种标题、别名、译名
|
||
if torrent.description:
|
||
subtitles = {StringUtils.clear_upper(t) for t in re.split(
|
||
r'[\s/【】|]+',
|
||
torrent.description) if not StringUtils.is_english_word(t)}
|
||
if media_titles.intersection(subtitles) or media_names.intersection(subtitles):
|
||
logger.info(f'{mediainfo.title} 通过副标题匹配到资源:{torrent.site_name} - {torrent.title},'
|
||
f'副标题:{torrent.description}')
|
||
return True
|
||
# 未匹配
|
||
logger.debug(f'{torrent.site_name} - {torrent.title} 标题不匹配,识别名称:{meta_names}')
|
||
return False
|
||
|
||
@staticmethod
|
||
def filter_torrent(torrent_info: TorrentInfo,
|
||
filter_params: Dict[str, str]) -> bool:
|
||
"""
|
||
检查种子是否匹配订阅过滤规则
|
||
"""
|
||
|
||
if not filter_params:
|
||
return True
|
||
|
||
# 匹配内容
|
||
content = (f"{torrent_info.title} "
|
||
f"{torrent_info.description} "
|
||
f"{' '.join(torrent_info.labels or [])} "
|
||
f"{torrent_info.volume_factor}")
|
||
|
||
# 包含
|
||
include = filter_params.get("include")
|
||
if include:
|
||
if not re.search(r"%s" % include, content, re.I):
|
||
logger.info(f"{content} 不匹配包含规则 {include}")
|
||
return False
|
||
# 排除
|
||
exclude = filter_params.get("exclude")
|
||
if exclude:
|
||
if re.search(r"%s" % exclude, content, re.I):
|
||
logger.info(f"{content} 匹配排除规则 {exclude}")
|
||
return False
|
||
# 质量
|
||
quality = filter_params.get("quality")
|
||
if quality:
|
||
if not re.search(r"%s" % quality, torrent_info.title, re.I):
|
||
logger.info(f"{torrent_info.title} 不匹配质量规则 {quality}")
|
||
return False
|
||
# 分辨率
|
||
resolution = filter_params.get("resolution")
|
||
if resolution:
|
||
if not re.search(r"%s" % resolution, torrent_info.title, re.I):
|
||
logger.info(f"{torrent_info.title} 不匹配分辨率规则 {resolution}")
|
||
return False
|
||
# 特效
|
||
effect = filter_params.get("effect")
|
||
if effect:
|
||
if not re.search(r"%s" % effect, torrent_info.title, re.I):
|
||
logger.info(f"{torrent_info.title} 不匹配特效规则 {effect}")
|
||
return False
|
||
|
||
# 大小
|
||
size_range = filter_params.get("size")
|
||
if size_range:
|
||
if size_range.find("-") != -1:
|
||
# 区间
|
||
size_min, size_max = size_range.split("-")
|
||
size_min = float(size_min.strip()) * 1024 * 1024
|
||
size_max = float(size_max.strip()) * 1024 * 1024
|
||
if torrent_info.size < size_min or torrent_info.size > size_max:
|
||
return False
|
||
elif size_range.startswith(">"):
|
||
# 大于
|
||
size_min = float(size_range[1:].strip()) * 1024 * 1024
|
||
if torrent_info.size < size_min:
|
||
return False
|
||
elif size_range.startswith("<"):
|
||
# 小于
|
||
size_max = float(size_range[1:].strip()) * 1024 * 1024
|
||
if torrent_info.size > size_max:
|
||
return False
|
||
|
||
return True
|
||
|
||
@staticmethod
|
||
def match_season_episodes(torrent: TorrentInfo, meta: MetaBase, season_episodes: Dict[int, list]) -> bool:
|
||
"""
|
||
判断种子是否匹配季集数
|
||
:param torrent: 种子信息
|
||
:param meta: 种子元数据
|
||
:param season_episodes: 季集数 {season:[episodes]}
|
||
"""
|
||
# 匹配季
|
||
seasons = season_episodes.keys()
|
||
# 种子季
|
||
torrent_seasons = meta.season_list
|
||
if not torrent_seasons:
|
||
# 按第一季处理
|
||
torrent_seasons = [1]
|
||
# 种子集
|
||
torrent_episodes = meta.episode_list
|
||
if not set(torrent_seasons).issubset(set(seasons)):
|
||
# 种子季不在过滤季中
|
||
logger.debug(
|
||
f"种子 {torrent.site_name} - {torrent.title} 包含季 {torrent_seasons} 不是需要的季 {list(seasons)}")
|
||
return False
|
||
if not torrent_episodes:
|
||
# 整季按匹配处理
|
||
return True
|
||
if len(torrent_seasons) == 1:
|
||
need_episodes = season_episodes.get(torrent_seasons[0])
|
||
if need_episodes \
|
||
and not set(torrent_episodes).intersection(set(need_episodes)):
|
||
# 单季集没有交集的不要
|
||
logger.debug(f"种子 {torrent.site_name} - {torrent.title} "
|
||
f"集 {torrent_episodes} 没有需要的集:{need_episodes}")
|
||
return False
|
||
return True
|