Merge pull request #5028 from ThedoRap/v2

This commit is contained in:
jxxghp
2025-10-07 23:00:21 +08:00
committed by GitHub
2 changed files with 134 additions and 101 deletions

View File

@@ -1,46 +1,38 @@
#
# 极速之星 https://bitpt.cn/
# author: ThedoRap
# time: 2025-10-02
#
# -*- coding: utf-8 -*-
import json
from typing import Optional, Tuple
import re
from typing import Optional, Tuple
from urllib.parse import urljoin, urlencode
from bs4 import BeautifulSoup
from app.modules.indexer.parser import SiteParserBase, SiteSchema
from app.utils.string import StringUtils
from bs4 import BeautifulSoup
from urllib.parse import urljoin
class BitptSiteUserInfo(SiteParserBase):
schema = SiteSchema.Bitpt
def _parse_site_page(self, html_text: str):
"""
获取站点页面地址
"""
self._user_basic_page = "userdetails.php?uid={uid}" # uid 需要在解析时替换
self._user_basic_page = "userdetails.php?uid={uid}"
self._user_detail_page = None
self._user_basic_params = {}
self._user_traffic_page = None
self._sys_mail_unread_page = None
self._user_mail_unread_page = None
self._mail_unread_params = {}
self._torrent_seeding_page = "browse.php?t=myseed"
self._torrent_seeding_params = {
"st": "2",
"d": "desc"
}
self._torrent_seeding_base = "browse.php"
self._torrent_seeding_params = {"t": "myseed", "st": "2", "d": "desc"}
self._torrent_seeding_headers = {}
self._addition_headers = {}
def _parse_logged_in(self, html_text):
"""
判断是否登录成功, 通过判断是否存在用户信息
"""
soup = BeautifulSoup(html_text, 'html.parser')
return bool(soup.find(id='userinfotop'))
def _parse_user_base_info(self, html_text: str):
"""
解析用户基本信息这里把_parse_user_traffic_info和_parse_user_detail_info合并到这里
"""
if not html_text:
return None
soup = BeautifulSoup(html_text, 'html.parser')
@@ -67,80 +59,63 @@ class BitptSiteUserInfo(SiteParserBase):
self.ratio = float(info_dict.get('共享率')) if '共享率' in info_dict else 0
bonus_str = info_dict.get('星辰', '')
self.bonus = float(re.search(r'累计([\d\.]+)', bonus_str).group(1)) if re.search(r'累计([\d\.]+)', bonus_str) else 0
self.message_unread = 0 # 暂无消息解析
self.message_unread = 0
# 做种信息从页面底部提取
seeding_info = soup.find('div', style="margin:0 auto;width:90%;font-size:14px;margin-top:10px;margin-bottom:10px;text-align:center;")
if seeding_info:
seeding_link = seeding_info.find_all('a')[1].text if len(seeding_info.find_all('a')) > 1 else ''
match = re.search(r'当前上传的种子\((\d+)个, 共([\d\.]+ [KMGT]B)\)', seeding_link)
if match:
self.seeding = int(match.group(1))
self.seeding_size = StringUtils.num_filesize(match.group(2))
else:
self.seeding = 0
self.seeding_size = 0
if hasattr(self, '_torrent_seeding_base') and self._torrent_seeding_base:
self.seeding = 0
self.seeding_size = 0
else:
seeding_info = soup.find('div', style="margin:0 auto;width:90%;font-size:14px;margin-top:10px;margin-bottom:10px;text-align:center;")
if seeding_info:
seeding_link = seeding_info.find_all('a')[1].text if len(seeding_info.find_all('a')) > 1 else ''
match = re.search(r'当前上传的种子\((\d+)个, 共([\d\.]+ [KMGT]B)\)', seeding_link)
if match:
self.seeding = int(match.group(1))
self.seeding_size = StringUtils.num_filesize(match.group(2))
else:
self.seeding = 0
self.seeding_size = 0
def _parse_user_traffic_info(self, html_text: str):
"""
解析用户流量信息
"""
pass
def _parse_user_detail_info(self, html_text: str):
"""
解析用户详细信息
"""
pass
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: Optional[bool] = False) -> Optional[str]:
"""
解析用户做种信息
"""
def _parse_user_torrent_seeding_page_info(self, html_text: str) -> Tuple[int, int]:
if not html_text:
return None
return 0, 0
soup = BeautifulSoup(html_text, 'html.parser')
torrents = soup.find_all('tr', class_=['btr0', 'btr1'])
torrent_table = soup.find('table', class_='torrenttable')
if not torrent_table:
return 0, 0
rows = torrent_table.find_all('tr')
if len(rows) <= 1:
return 0, 0
torrents = [row for row in rows[1:] if 'btr' in row.get('class', [])]
page_seeding = 0
page_seeding_size = 0
for torrent in torrents:
size_td = torrent.find('td', class_='r')
if size_td:
size_text = size_td.find('a').text if size_td.find('a') else size_td.text
page_seeding += 1
page_seeding_size += StringUtils.num_filesize(size_text)
self.seeding += page_seeding
self.seeding_size += page_seeding_size
# 是否存在下页数据
pager = soup.find('div', class_='pager')
next_page = None
if pager:
next_link = pager.find('a', string=re.compile('下一页'))
if next_link:
next_page = next_link['href']
return next_page
size_a = size_td.find('a')
size_text = size_a.text.strip() if size_a else size_td.text.strip()
if size_text:
page_seeding += 1
page_seeding_size += StringUtils.num_filesize(size_text)
return page_seeding, page_seeding_size
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
"""
解析未读消息链接,这里直接读出详情
"""
pass
def _parse_message_content(self, html_text) -> Tuple[Optional[str], Optional[str], Optional[str]]:
"""
解析消息内容
"""
pass
def _parse_user_torrent_seeding_info(self, html_text: str):
pass
def parse(self):
"""
解析站点信息
"""
super().parse()
# 先从首页解析userid
if self._index_html:
soup = BeautifulSoup(self._index_html, 'html.parser')
user_link = soup.find('a', href=re.compile(r'userdetails\.php\?uid=\d+'))
@@ -148,8 +123,39 @@ class BitptSiteUserInfo(SiteParserBase):
uid_match = re.search(r'uid=(\d+)', user_link['href'])
if uid_match:
self.userid = uid_match.group(1)
# 如果有userid则格式化_user_basic_page
if self.userid and self._user_basic_page:
basic_url = self._user_basic_page.format(uid=self.userid)
basic_html = self._get_page_content(url=urljoin(self._base_url, basic_url))
self._parse_user_base_info(basic_html)
self._parse_user_base_info(basic_html)
if hasattr(self, '_torrent_seeding_base') and self._torrent_seeding_base:
seeding_base_url = urljoin(self._base_url, self._torrent_seeding_base)
params = self._torrent_seeding_params.copy()
page_num = 1
while True:
params['p'] = page_num
query_string = urlencode(params)
full_url = f"{seeding_base_url}?{query_string}"
seeding_html = self._get_page_content(url=full_url)
page_seeding, page_seeding_size = self._parse_user_torrent_seeding_page_info(seeding_html)
self.seeding += page_seeding
self.seeding_size += page_seeding_size
if page_seeding == 0:
break
page_num += 1
# 🔑 最终对外统一转字符串
self.userid = str(self.userid or "")
self.username = str(self.username or "")
self.user_level = str(self.user_level or "")
self.join_at = str(self.join_at or "")
self.upload = str(self.upload or 0)
self.download = str(self.download or 0)
self.ratio = str(self.ratio or 0)
self.bonus = str(self.bonus or 0.0)
self.message_unread = str(self.message_unread or 0)
self.seeding = str(self.seeding or 0)
self.seeding_size = str(self.seeding_size or 0)

View File

@@ -1,3 +1,8 @@
#
# 知行 http://pt.zhixing.bjtu.edu.cn/
# author: ThedoRap
# time: 2025-10-02
#
# -*- coding: utf-8 -*-
import re
from typing import Optional, Tuple
@@ -22,7 +27,7 @@ class ZhixingSiteUserInfo(SiteParserBase):
self._sys_mail_unread_page = None
self._user_mail_unread_page = None
self._mail_unread_params = {}
self._torrent_seeding_page = "user/{uid}/seeding"
self._torrent_seeding_base = "user/{uid}/seeding"
self._torrent_seeding_params = {}
self._torrent_seeding_headers = {}
self._addition_headers = {}
@@ -58,6 +63,8 @@ class ZhixingSiteUserInfo(SiteParserBase):
value = re.split(r'\s*\(', value_text)[0].strip().split('查看')[0].strip()
info_dict[key] = value
self._basic_info = info_dict # Save for fallback
self.userid = info_dict.get('UID')
self.username = info_dict.get('用户名')
self.user_level = info_dict.get('用户组')
@@ -76,27 +83,22 @@ class ZhixingSiteUserInfo(SiteParserBase):
self.bonus = float(info_dict.get('保种积分')) if '保种积分' in info_dict else 0.0
self.message_unread = 0 # 暂无消息解析
# Temporarily set seeding from basic, will override or fallback later
self.seeding = int(info_dict.get('当前保种数量')) if '当前保种数量' in info_dict else 0
self.seeding_size = num_filesize_safe(info_dict.get('当前保种容量')) if '当前保种容量' in info_dict else 0
def _parse_user_traffic_info(self, html_text: str):
"""
解析用户流量信息
"""
pass
def _parse_user_detail_info(self, html_text: str):
"""
解析用户详细信息
"""
pass
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: Optional[bool] = False) -> Optional[str]:
def _parse_user_torrent_seeding_page_info(self, html_text: str) -> Tuple[int, int]:
"""
解析用户做种信息
解析用户做种信息单页,返回本页数量和大小
"""
if not html_text:
return None
return 0, 0
soup = BeautifulSoup(html_text, 'html.parser')
torrents = soup.find_all('tr', id=re.compile(r'^t\d+'))
page_seeding = 0
@@ -107,30 +109,17 @@ class ZhixingSiteUserInfo(SiteParserBase):
size_text = size_td.find('a').text if size_td.find('a') else size_td.text.strip()
page_seeding += 1
page_seeding_size += StringUtils.num_filesize(size_text)
self.seeding += page_seeding
self.seeding_size += page_seeding_size
# 是否存在下页数据
next_page = None
# 假设有分页元素,类似 <div class="pager"> 中的 <a href="...?p=2">下一页</a>
pager = soup.find('div', class_='pager')
if pager:
next_link = pager.find('a', string=re.compile('下一页'))
if next_link:
next_page = next_link['href']
return next_page
return page_seeding, page_seeding_size
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
"""
解析未读消息链接,这里直接读出详情
"""
pass
def _parse_message_content(self, html_text) -> Tuple[Optional[str], Optional[str], Optional[str]]:
pass
def _parse_user_torrent_seeding_info(self, html_text: str):
"""
解析消息内容
占位,避免抽象类报错
"""
pass
@@ -153,5 +142,43 @@ class ZhixingSiteUserInfo(SiteParserBase):
basic_url = self._user_basic_page.format(uid=self.userid)
basic_html = self._get_page_content(url=urljoin(self._base_url, basic_url))
self._parse_user_base_info(basic_html)
if self._torrent_seeding_page:
self._torrent_seeding_page = self._torrent_seeding_page.format(uid=self.userid)
if hasattr(self, '_torrent_seeding_base') and self._torrent_seeding_base:
self.seeding = 0 # Reset to sum from pages
self.seeding_size = 0
seeding_base = self._torrent_seeding_base.format(uid=self.userid)
seeding_base_url = urljoin(self._base_url, seeding_base)
page_num = 1
while True:
seeding_url = f"{seeding_base_url}/p{page_num}"
seeding_html = self._get_page_content(url=seeding_url)
page_seeding, page_seeding_size = self._parse_user_torrent_seeding_page_info(seeding_html)
self.seeding += page_seeding
self.seeding_size += page_seeding_size
if page_seeding == 0:
break
page_num += 1
# Fallback to basic if no seeding found from pages
if self.seeding == 0 and hasattr(self, '_basic_info'):
def num_filesize_safe(s: str):
if s:
s = s.strip()
if re.match(r'^\d+(\.\d+)?$', s):
s += ' B'
return StringUtils.num_filesize(s) if s else 0
self.seeding = int(self._basic_info.get('当前保种数量', 0))
self.seeding_size = num_filesize_safe(self._basic_info.get('当前保种容量', ''))
# 🔑 最终对外统一转字符串,避免 join 报错
self.userid = str(self.userid or "")
self.username = str(self.username or "")
self.user_level = str(self.user_level or "")
self.join_at = str(self.join_at or "")
self.upload = str(self.upload or 0)
self.download = str(self.download or 0)
self.ratio = str(self.ratio or 0)
self.bonus = str(self.bonus or 0.0)
self.message_unread = str(self.message_unread or 0)
self.seeding = str(self.seeding or 0)
self.seeding_size = str(self.seeding_size or 0)