Files
2026-01-09 22:08:24 +08:00

473 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import json
import re
from abc import ABCMeta, abstractmethod
from enum import Enum
from typing import Optional
from urllib.parse import urljoin, urlsplit
from requests import Session
from app.core.config import settings
from app.helper.cloudflare import under_challenge
from app.log import logger
from app.utils.http import RequestUtils
from app.utils.site import SiteUtils
# 站点框架
class SiteSchema(Enum):
DiscuzX = "DiscuzX"
Gazelle = "Gazelle"
Ipt = "IPTorrents"
NexusPhp = "NexusPhp"
NexusProject = "NexusProject"
NexusRabbit = "NexusRabbit"
NexusHhanclub = "NexusHhanclub"
NexusAudiences = "NexusAudiences"
SmallHorse = "Small Horse"
Unit3d = "Unit3d"
TorrentLeech = "TorrentLeech"
FileList = "FileList"
TNode = "TNode"
MTorrent = "MTorrent"
Yema = "Yema"
HDDolby = "HDDolby"
Zhixing = "Zhixing"
Bitpt = "Bitpt"
RousiPro = "RousiPro"
class SiteParserBase(metaclass=ABCMeta):
# 站点模版
schema = None
# 请求模式 cookie/apikey
request_mode = "cookie"
def __init__(self, site_name: str,
url: str,
site_cookie: str,
apikey: str,
token: str,
session: Session = None,
ua: Optional[str] = None,
emulate: bool = False,
proxy: bool = None):
super().__init__()
# 站点信息
self.apikey = apikey
self.token = token
self._site_name = site_name
self._site_url = url
__split_url = urlsplit(url)
self._site_domain = __split_url.netloc
self._base_url = f"{__split_url.scheme}://{__split_url.netloc}"
self._site_cookie = site_cookie
self._session = session if session else None
self._ua = ua
self._emulate = emulate
self._proxy = proxy
self._index_html = ""
# 用户信息
self.username = None
self.userid = None
self.user_level = None
self.join_at = None
self.bonus = 0.0
# 流量信息
self.upload = 0
self.download = 0
self.ratio = 0
# 做种信息
self.seeding = 0
self.leeching = 0
self.seeding_size = 0
self.leeching_size = 0
self.uploaded = 0
self.completed = 0
self.incomplete = 0
self.uploaded_size = 0
self.completed_size = 0
self.incomplete_size = 0
# 做种人数, 种子大小
self.seeding_info = []
# 未读消息
self.message_unread = 0
self.message_unread_contents = []
self.message_read_force = False
# 全局附加请求头
self._addition_headers = None
# 用户基础信息页面
self._user_basic_page = None
# 用户基础信息参数
self._user_basic_params = None
# 用户基础信息请求头
self._user_basic_headers = None
# 用户详情信息页面
self._user_detail_page = "userdetails.php?id="
# 用户详情信息参数
self._user_detail_params = None
# 用户详情信息请求头
self._user_detail_headers = None
# 用户流量信息页面
self._user_traffic_page = "index.php"
# 用户流量信息参数
self._user_traffic_params = None
# 用户流量信息请求头
self._user_traffic_headers = None
# 用户未读消息页面
self._user_mail_unread_page = "messages.php?action=viewmailbox&box=1&unread=yes"
# 系统未读消息页面
self._sys_mail_unread_page = "messages.php?action=viewmailbox&box=-2&unread=yes"
# 未读消息数参数
self._mail_unread_params = None
# 未读消息数请求头
self._mail_unread_headers = None
# 未读消息内容参数
self._mail_content_params = None
# 未读消息内容请求头
self._mail_content_headers = None
# 用户做种信息页面
self._torrent_seeding_page = "getusertorrentlistajax.php?userid="
# 用户做种信息参数
self._torrent_seeding_params = None
# 用户做种信息请求头
self._torrent_seeding_headers = None
# 错误信息
self.err_msg = None
def site_schema(self) -> SiteSchema:
"""
站点解析模型
:return: 站点解析模型
"""
return self.schema
def parse(self):
"""
解析站点信息
:return:
"""
try:
# Cookie模式时获取站点首页html
if self.request_mode == "apikey":
if not self.apikey and not self.token:
logger.warn(f"{self._site_name} 未设置cookie 或 apikey/token跳过后续操作")
return
self._index_html = {}
else:
# 检查是否已经登录
self._index_html = self._get_page_content(url=self._site_url)
if not self._parse_logged_in(self._index_html):
return
# 解析站点页面
self._parse_site_page(self._index_html)
# 解析用户基础信息
if self._user_basic_page:
self._parse_user_base_info(
self._get_page_content(
url=urljoin(self._base_url, self._user_basic_page),
params=self._user_basic_params,
headers=self._user_basic_headers
)
)
else:
self._parse_user_base_info(self._index_html)
# 解析用户详细信息
if self._user_detail_page:
self._parse_user_detail_info(
self._get_page_content(
url=urljoin(self._base_url, self._user_detail_page),
params=self._user_detail_params,
headers=self._user_detail_headers
)
)
# 解析用户未读消息
if settings.SITE_MESSAGE:
self._pase_unread_msgs()
# 解析用户上传、下载、分享率等信息
if self._user_traffic_page:
self._parse_user_traffic_info(
self._get_page_content(
url=urljoin(self._base_url, self._user_traffic_page),
params=self._user_traffic_params,
headers=self._user_traffic_headers
)
)
# 解析用户做种信息
self._parse_seeding_pages()
finally:
# 关闭连接
self.close()
def _pase_unread_msgs(self):
"""
解析所有未读消息标题和内容
:return:
"""
unread_msg_links = []
if self.message_unread > 0 or self.message_read_force:
links = {self._user_mail_unread_page, self._sys_mail_unread_page}
for link in links:
if not link:
continue
msg_links = []
next_page = self._parse_message_unread_links(
self._get_page_content(
url=urljoin(self._base_url, link),
params=self._mail_unread_params,
headers=self._mail_unread_headers
),
msg_links)
while next_page:
next_page = self._parse_message_unread_links(
self._get_page_content(
url=urljoin(self._base_url, next_page),
params=self._mail_unread_params,
headers=self._mail_unread_headers
),
msg_links
)
unread_msg_links.extend(msg_links)
# 重新更新未读消息数99999表示有消息但数量未知
if unread_msg_links and not self.message_unread:
self.message_unread = len(unread_msg_links)
# 解析未读消息内容
for msg_link in unread_msg_links:
logger.debug(f"{self._site_name} 信息链接 {msg_link}")
head, date, content = self._parse_message_content(
self._get_page_content(
urljoin(self._base_url, msg_link),
params=self._mail_content_params,
headers=self._mail_content_headers
)
)
logger.debug(f"{self._site_name} 标题 {head} 时间 {date} 内容 {content}")
self.message_unread_contents.append((head, date, content))
def _parse_seeding_pages(self):
"""
解析做种页面
"""
if self._torrent_seeding_page:
# 第一页
next_page = self._parse_user_torrent_seeding_info(
self._get_page_content(
url=urljoin(self._base_url, self._torrent_seeding_page),
params=self._torrent_seeding_params,
headers=self._torrent_seeding_headers
)
)
# 其他页处理
while next_page is not None and next_page is not False:
next_page = self._parse_user_torrent_seeding_info(
self._get_page_content(
url=urljoin(urljoin(self._base_url, self._torrent_seeding_page), next_page),
params=self._torrent_seeding_params,
headers=self._torrent_seeding_headers
),
multi_page=True)
@staticmethod
def _prepare_html_text(html_text):
"""
处理掉HTML中的干扰部分
"""
return re.sub(r"#\d+", "", re.sub(r"\d+px", "", html_text))
@abstractmethod
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
"""
获取未阅读消息链接
:param html_text:
:return:
"""
pass
def _get_page_content(self, url: str, params: dict = None, headers: dict = None):
"""
获取页面内容
:param url: 网页地址
:param params: post参数
:param headers: 额外的请求头
:return:
"""
req_headers = None
proxies = settings.PROXY if self._proxy else None
if self._ua or headers or self._addition_headers:
if self.request_mode == "apikey":
req_headers = {}
else:
req_headers = {
"User-Agent": f"{self._ua}"
}
if headers:
req_headers.update(headers)
else:
req_headers.update({
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
})
if self._addition_headers:
req_headers.update(self._addition_headers)
if self.request_mode == "apikey":
# 使用apikey请求通过请求头传递
cookie = None
session = None
else:
# 使用cookie请求
cookie = self._site_cookie
session = self._session
if params:
if req_headers.get("Content-Type") == "application/json":
res = RequestUtils(cookies=cookie,
session=session,
timeout=60,
proxies=proxies,
headers=req_headers).post_res(url=url, json=params)
else:
res = RequestUtils(cookies=cookie,
session=session,
timeout=60,
proxies=proxies,
headers=req_headers).post_res(url=url, data=params)
else:
res = RequestUtils(cookies=cookie,
session=session,
timeout=60,
proxies=proxies,
headers=req_headers).get_res(url=url)
if res is not None and res.status_code in (200, 500, 403):
if req_headers and "application/json" in str(req_headers.get("Accept")):
try:
return json.dumps(res.json())
except (json.JSONDecodeError, ValueError) as e:
logger.error(f"{self._site_name} API响应JSON解析失败: {e}")
return ""
else:
# 如果cloudflare 有防护,尝试使用浏览器仿真
if under_challenge(res.text):
logger.warn(
f"{self._site_name} 检测到Cloudflare请更新Cookie和UA")
return ""
return RequestUtils.get_decoded_html_content(res,
settings.ENCODING_DETECTION_PERFORMANCE_MODE,
settings.ENCODING_DETECTION_MIN_CONFIDENCE)
return ""
@abstractmethod
def _parse_site_page(self, html_text: str):
"""
解析站点相关信息页面
:param html_text:
:return:
"""
pass
@abstractmethod
def _parse_user_base_info(self, html_text: str):
"""
解析用户基础信息
:param html_text:
:return:
"""
pass
def _parse_logged_in(self, html_text):
"""
解析用户是否已经登陆
:param html_text:
:return: True/False
"""
logged_in = SiteUtils.is_logged_in(html_text)
if not logged_in:
self.err_msg = "未检测到已登陆请检查cookies是否过期"
logger.warn(f"{self._site_name} 未登录,跳过后续操作")
return logged_in
@abstractmethod
def _parse_user_traffic_info(self, html_text: str):
"""
解析用户的上传,下载,分享率等信息
:param html_text:
:return:
"""
pass
@abstractmethod
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
解析用户的做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
pass
@abstractmethod
def _parse_user_detail_info(self, html_text: str):
"""
解析用户的详细信息
加入时间/等级/魔力值等
:param html_text:
:return:
"""
pass
@abstractmethod
def _parse_message_content(self, html_text):
"""
解析短消息内容
:param html_text:
:return: head: message, date: time, content: message content
"""
pass
def close(self):
"""
关闭会话
"""
if self._session:
self._session.close()
self._session = None
def clear(self):
"""
清除当前解析器的所有信息
"""
self._index_html = ""
self.seeding_info.clear()
self.message_unread_contents.clear()
def to_dict(self):
"""
转化为字典
"""
attributes = [
attr for attr in dir(self)
if not callable(getattr(self, attr)) and not attr.startswith("_")
]
return {
attr: getattr(self, attr).value
if isinstance(getattr(self, attr), SiteSchema)
else getattr(self, attr) for attr in attributes
}