mirror of
https://github.com/EstrellaXD/Auto_Bangumi.git
synced 2026-04-24 18:40:03 +08:00
@@ -13,7 +13,7 @@ SEARCH_KEY = ["group", "title_raw", "season_raw", "subtitle", "source", "dpi"]
|
||||
|
||||
class FullSeasonGet:
|
||||
def __init__(self):
|
||||
self._get_rss = RequestContent()
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def init_eps_complete_search_str(data: dict):
|
||||
@@ -24,7 +24,8 @@ class FullSeasonGet:
|
||||
|
||||
def get_season_torrents(self, data: dict):
|
||||
keyword = self.init_eps_complete_search_str(data)
|
||||
torrents = self._get_rss.get_torrents(f"https://mikanani.me/RSS/Search?searchstr={keyword}")
|
||||
with RequestContent() as req:
|
||||
torrents = req.get_torrents(f"https://mikanani.me/RSS/Search?searchstr={keyword}")
|
||||
return torrents
|
||||
|
||||
@staticmethod
|
||||
@@ -56,7 +57,8 @@ class FullSeasonGet:
|
||||
self.download_eps(data, download_client)
|
||||
|
||||
def download_collection(self, data, link, download_client: DownloadClient):
|
||||
torrents = self._get_rss.get_torrents(link)
|
||||
with RequestContent() as req:
|
||||
torrents = req.get_torrents(link)
|
||||
downloads = self.collect_season_torrents(data, torrents)
|
||||
logger.info(f"Starting download {data.get('official_title')}")
|
||||
for download in downloads:
|
||||
|
||||
@@ -8,6 +8,7 @@ from .download_client import DownloadClient
|
||||
|
||||
from module.conf import settings
|
||||
from module.parser import TitleParser
|
||||
from ..network import PostNotification, ServerChanNotification
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -53,6 +54,7 @@ class Renamer:
|
||||
return path_name, season, folder_name, suffix, download_path
|
||||
|
||||
def run(self):
|
||||
notification = ServerChanNotification()
|
||||
recent_info, torrent_count = self.get_torrent_info()
|
||||
rename_count = 0
|
||||
for info in recent_info:
|
||||
@@ -68,6 +70,7 @@ class Renamer:
|
||||
old_name = info.content_path.replace(info.save_path, "")
|
||||
self.client.rename_torrent_file(torrent_hash, new_name, old_name, new_name)
|
||||
rename_count += 1
|
||||
notification.send_msg(f"《{name[:10]}》缓存成功", f"[Auto Bangumi]《{name}》缓存成功")
|
||||
else:
|
||||
continue
|
||||
except Exception as e:
|
||||
|
||||
@@ -14,11 +14,10 @@ logger = logging.getLogger(__name__)
|
||||
class RSSAnalyser:
|
||||
def __init__(self):
|
||||
self._title_analyser = TitleParser()
|
||||
self._request = RequestContent()
|
||||
|
||||
def rss_to_datas(self, bangumi_info: list) -> list:
|
||||
rss_torrents = self._request.get_torrents(settings.rss_parser.link)
|
||||
self._request.close_session()
|
||||
with RequestContent() as req:
|
||||
rss_torrents = req.get_torrents(settings.rss_parser.link)
|
||||
for torrent in rss_torrents:
|
||||
raw_title = torrent.name
|
||||
extra_add = True
|
||||
@@ -35,8 +34,8 @@ class RSSAnalyser:
|
||||
return bangumi_info
|
||||
|
||||
def rss_to_data(self, url) -> dict:
|
||||
rss_torrents = self._request.get_torrents(url)
|
||||
self._request.close_session()
|
||||
with RequestContent() as req:
|
||||
rss_torrents = req.get_torrents(url)
|
||||
for torrent in rss_torrents:
|
||||
try:
|
||||
data = self._title_analyser.return_dict(torrent.name)
|
||||
|
||||
@@ -1,6 +1,2 @@
|
||||
from .request_contents import RequestContent
|
||||
from .notification import PostNotification
|
||||
|
||||
|
||||
|
||||
|
||||
from .notification import PostNotification, ServerChanNotification
|
||||
|
||||
@@ -1,8 +1,13 @@
|
||||
import logging
|
||||
|
||||
import requests
|
||||
|
||||
from module.conf import settings
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PostNotification:
|
||||
def __init__(self):
|
||||
self.token = settings.notification_token
|
||||
@@ -11,4 +16,33 @@ class PostNotification:
|
||||
def ifttt_post(self, message):
|
||||
url = self.notification_url(message)
|
||||
response = requests.get(url)
|
||||
return response.status_code == 200
|
||||
return response.status_code == 200
|
||||
|
||||
|
||||
class ServerChanNotification:
|
||||
"""Server酱推送"""
|
||||
|
||||
def __init__(self):
|
||||
self.token = settings.notification.token
|
||||
self.notification_url = f"https://sctapi.ftqq.com/{self.token}.send"
|
||||
|
||||
def send_msg(self, title: str, desp: str) -> bool:
|
||||
if not settings.notification.enable:
|
||||
return False
|
||||
data = {
|
||||
"title": title,
|
||||
"desp": desp,
|
||||
}
|
||||
try:
|
||||
resp = requests.post(self.notification_url, json=data, timeout=3)
|
||||
resp.raise_for_status()
|
||||
except requests.RequestException as e:
|
||||
logging.error("[ServerChanNotification] send fail, error: %s" % e)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
name = "勇者、辞职不干了"
|
||||
notification = ServerChanNotification()
|
||||
notification.send_msg(f"《{name[:10]}》缓存成功", f"[Auto Bangumi]《{name}》缓存成功")
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from .request_url import RequestURL
|
||||
from module.conf import settings
|
||||
|
||||
@@ -7,19 +9,17 @@ import re
|
||||
|
||||
FILTER = "|".join(settings.rss_parser.filter)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TorrentInfo:
|
||||
name: str
|
||||
torrent_link: str
|
||||
|
||||
|
||||
class RequestContent:
|
||||
def __init__(self):
|
||||
self._req = RequestURL()
|
||||
|
||||
class RequestContent(RequestURL):
|
||||
# Mikanani RSS
|
||||
def get_torrents(self, _url: str) -> [TorrentInfo]:
|
||||
soup = self._req.get_content(_url)
|
||||
soup = self.get_xml(_url)
|
||||
torrent_titles = [item.title.string for item in soup.find_all("item")]
|
||||
torrent_urls = [item.get("url") for item in soup.find_all("enclosure")]
|
||||
torrents = []
|
||||
@@ -29,14 +29,14 @@ class RequestContent:
|
||||
return torrents
|
||||
|
||||
def get_torrent(self, _url) -> TorrentInfo:
|
||||
soup = self._req.get_content(_url)
|
||||
soup = self.get_xml(_url)
|
||||
item = soup.find("item")
|
||||
enclosure = item.find("enclosure")
|
||||
return TorrentInfo(item.title.string, enclosure["url"])
|
||||
|
||||
def get_xml(self, url):
|
||||
return BeautifulSoup(self.get_url(url).text, "xml")
|
||||
|
||||
# API JSON
|
||||
def get_json(self, _url) -> dict:
|
||||
return self._req.get_content(_url, content="json")
|
||||
|
||||
def close_session(self):
|
||||
self._req.close()
|
||||
return self.get_url(_url).json()
|
||||
|
||||
@@ -5,8 +5,6 @@ import socket
|
||||
import socks
|
||||
import logging
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from module.conf import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -14,7 +12,31 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
class RequestURL:
|
||||
def __init__(self):
|
||||
self.session = requests.session()
|
||||
self.header = {
|
||||
"user-agent": "Mozilla/5.0",
|
||||
"Accept": "application/xml"
|
||||
}
|
||||
|
||||
def get_url(self, url):
|
||||
times = 0
|
||||
while times < 5:
|
||||
try:
|
||||
req = self.session.get(url=url, headers=self.header)
|
||||
req.raise_for_status()
|
||||
return req
|
||||
except requests.RequestException as e:
|
||||
logger.debug(f"URL: {url}")
|
||||
logger.debug(e)
|
||||
logger.warning("ERROR with Connection.Please check DNS/Connection settings")
|
||||
time.sleep(5)
|
||||
times += 1
|
||||
except Exception as e:
|
||||
logger.debug(f"URL: {url}")
|
||||
logger.debug(e)
|
||||
break
|
||||
|
||||
def __enter__(self):
|
||||
self.session = requests.Session()
|
||||
if settings.proxy.enable:
|
||||
if settings.proxy.type == "http":
|
||||
url = f"http://{settings.proxy.host}:{settings.proxy.port}"
|
||||
@@ -26,31 +48,8 @@ class RequestURL:
|
||||
socks.set_default_proxy(socks.SOCKS5, addr=settings.proxy.host, port=settings.proxy.port, rdns=True,
|
||||
username=settings.proxy.username, password=settings.proxy.password)
|
||||
socket.socket = socks.socksocket
|
||||
self.header = {
|
||||
"user-agent": "Mozilla/5.0",
|
||||
"Accept": "application/xml"
|
||||
}
|
||||
|
||||
def get_url(self, url):
|
||||
times = 0
|
||||
while times < 5:
|
||||
try:
|
||||
req = self.session.get(url=url, headers=self.header)
|
||||
return req
|
||||
except Exception as e:
|
||||
logger.debug(f"URL: {url}")
|
||||
logger.debug(e)
|
||||
logger.warning("ERROR with Connection.Please check DNS/Connection settings")
|
||||
time.sleep(5)
|
||||
times += 1
|
||||
|
||||
def get_content(self, url, content="xml"):
|
||||
if content == "xml":
|
||||
return BeautifulSoup(self.get_url(url).text, content)
|
||||
elif content == "json":
|
||||
return self.get_url(url).json()
|
||||
|
||||
def close(self):
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.session.close()
|
||||
|
||||
|
||||
|
||||
@@ -7,11 +7,11 @@ class BgmAPI:
|
||||
f"https://api.bgm.tv/search/subject/{e}?type=2"
|
||||
self.info_url = lambda e: \
|
||||
f"https://api.bgm.tv/subject/{e}"
|
||||
self._request = RequestContent()
|
||||
|
||||
def search(self, title):
|
||||
url = self.search_url(title)
|
||||
contents = self._request.get_json(url)["list"]
|
||||
if contents.__len__() == 0:
|
||||
return None
|
||||
return contents[0]["name"], contents[0]["name_cn"]
|
||||
with RequestContent() as req:
|
||||
contents = req.get_json(url)["list"]
|
||||
if contents.__len__() == 0:
|
||||
return None
|
||||
return contents[0]["name"], contents[0]["name_cn"]
|
||||
|
||||
@@ -22,14 +22,14 @@ class TMDBMatcher:
|
||||
f"https://api.themoviedb.org/3/search/tv?api_key={TMDB_API}&page=1&query={e}&include_adult=false"
|
||||
self.info_url = lambda e: \
|
||||
f"https://api.themoviedb.org/3/tv/{e}?api_key={TMDB_API}&language=zh-CN"
|
||||
self._request = RequestContent()
|
||||
|
||||
def is_animation(self, tv_id) -> bool:
|
||||
url_info = self.info_url(tv_id)
|
||||
type_id = self._request.get_json(url_info)["genres"]
|
||||
for type in type_id:
|
||||
if type.get("id") == 16:
|
||||
return True
|
||||
with RequestContent() as req:
|
||||
type_id = req.get_json(url_info)["genres"]
|
||||
for type in type_id:
|
||||
if type.get("id") == 16:
|
||||
return True
|
||||
return False
|
||||
|
||||
# def get_zh_title(self, id):
|
||||
@@ -51,20 +51,20 @@ class TMDBMatcher:
|
||||
return int(re.findall(r"\d", season.get("season"))[0])
|
||||
|
||||
def tmdb_search(self, title) -> TMDBInfo:
|
||||
url = self.search_url(title)
|
||||
contents = self._request.get_json(url).get("results")
|
||||
if contents.__len__() == 0:
|
||||
url = self.search_url(title.replace(" ", ""))
|
||||
contents = self._request.get_json(url).get("results")
|
||||
# 判断动画
|
||||
for content in contents:
|
||||
id = content["id"]
|
||||
if self.is_animation(id):
|
||||
break
|
||||
url_info = self.info_url(id)
|
||||
info_content = self._request.get_json(url_info)
|
||||
# 关闭链接
|
||||
self._request.close()
|
||||
with RequestContent() as req:
|
||||
url = self.search_url(title)
|
||||
contents = req.get_json(url).get("results")
|
||||
if contents.__len__() == 0:
|
||||
url = self.search_url(title.replace(" ", ""))
|
||||
contents = req.get_json(url).get("results")
|
||||
# 判断动画
|
||||
for content in contents:
|
||||
id = content["id"]
|
||||
if self.is_animation(id):
|
||||
break
|
||||
url_info = self.info_url(id)
|
||||
info_content = req.get_json(url_info)
|
||||
|
||||
season = [{"season": s.get("name"), "air_date": s.get("air_date")} for s in info_content.get("seasons")]
|
||||
last_season = self.get_season(season)
|
||||
title_jp = info_content.get("original_name")
|
||||
|
||||
Reference in New Issue
Block a user